You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/18 12:25:39 UTC
[1/2] incubator-atlas git commit: ATLAS-713 Entity lineage based on
entity id (shwethags)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 857561a39 -> b65dd91c3
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
new file mode 100644
index 0000000..3028dde
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.discovery;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.BaseRepositoryTest;
+import org.apache.atlas.RepositoryMetadataModule;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.commons.collections.ArrayStack;
+import org.apache.commons.lang.RandomStringUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for Hive LineageService.
+ */
+@Guice(modules = RepositoryMetadataModule.class)
+public class DataSetLineageServiceTest extends BaseRepositoryTest {
+
+ @Inject
+ private DiscoveryService discoveryService;
+
+ @Inject
+ private DataSetLineageService lineageService;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @DataProvider(name = "dslQueriesProvider")
+ private Object[][] createDSLQueries() {
+ return new String[][]{
+ // joins
+ {"hive_table where name=\"sales_fact\", columns"},
+ {"hive_table where name=\"sales_fact\", columns select name, dataType, comment"},
+ {"hive_table where name=\"sales_fact\", columns as c select c.name, c.dataType, c.comment"},
+ // {"hive_db as db where (db.name=\"Reporting\"), hive_table as table select db.name,
+ // table.name"},
+ {"from hive_db"}, {"hive_db"}, {"hive_db where hive_db.name=\"Reporting\""},
+ {"hive_db hive_db.name = \"Reporting\""},
+ {"hive_db where hive_db.name=\"Reporting\" select name, owner"}, {"hive_db has name"},
+ // {"hive_db, hive_table"},
+ // {"hive_db, hive_process has name"},
+ // {"hive_db as db1, hive_table where db1.name = \"Reporting\""},
+ // {"hive_db where hive_db.name=\"Reporting\" and hive_db.createTime < " + System
+ // .currentTimeMillis()},
+ {"from hive_table"}, {"hive_table"}, {"hive_table is Dimension"},
+ {"hive_column where hive_column isa PII"},
+ // {"hive_column where hive_column isa PII select hive_column.name"},
+ {"hive_column select hive_column.name"}, {"hive_column select name"},
+ {"hive_column where hive_column.name=\"customer_id\""}, {"from hive_table select hive_table.name"},
+ {"hive_db where (name = \"Reporting\")"},
+ {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1"},
+ {"hive_db where hive_db has name"},
+ // {"hive_db hive_table"},
+ {"hive_db where hive_db has name"},
+ // {"hive_db as db1 hive_table where (db1.name = \"Reporting\")"},
+ {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 "},
+ // {"hive_db where (name = \"Reporting\") and ((createTime + 1) > 0)"},
+ // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name =
+ // \"Reporting\") select db1.name as dbName, tab.name as tabName"},
+ // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) or (db1.name =
+ // \"Reporting\") select db1.name as dbName, tab.name as tabName"},
+ // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name =
+ // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"},
+ // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name =
+ // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"},
+ // trait searches
+ {"Dimension"}, {"Fact"}, {"ETL"}, {"Metric"}, {"PII"},};
+ }
+
+ @Test(dataProvider = "dslQueriesProvider")
+ public void testSearchByDSLQueries(String dslQuery) throws Exception {
+ System.out.println("Executing dslQuery = " + dslQuery);
+ String jsonResults = discoveryService.searchByDSL(dslQuery);
+ assertNotNull(jsonResults);
+
+ JSONObject results = new JSONObject(jsonResults);
+ Assert.assertEquals(results.length(), 3);
+ System.out.println("results = " + results);
+
+ Object query = results.get("query");
+ assertNotNull(query);
+
+ JSONObject dataType = results.getJSONObject("dataType");
+ assertNotNull(dataType);
+ String typeName = dataType.getString("typeName");
+ assertNotNull(typeName);
+
+ JSONArray rows = results.getJSONArray("rows");
+ assertNotNull(rows);
+ Assert.assertTrue(rows.length() >= 0); // some queries may not have any results
+ System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows");
+ }
+
+ @Test(dataProvider = "invalidArgumentsProvider")
+ public void testGetInputsGraphInvalidArguments(final String tableName, String expectedException) throws Exception {
+ testInvalidArguments(expectedException, new Invoker() {
+ @Override
+ void run() throws AtlasException {
+ lineageService.getInputsGraph(tableName);
+ }
+ });
+ }
+
+ @Test(dataProvider = "invalidArgumentsProvider")
+ public void testGetInputsGraphForEntityInvalidArguments(final String tableName, String expectedException)
+ throws Exception {
+ testInvalidArguments(expectedException, new Invoker() {
+ @Override
+ void run() throws AtlasException {
+ lineageService.getInputsGraphForEntity(tableName);
+ }
+ });
+ }
+
+ @Test
+ public void testGetInputsGraph() throws Exception {
+ JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv"));
+ assertNotNull(results);
+ System.out.println("inputs graph = " + results);
+
+ JSONObject values = results.getJSONObject("values");
+ assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 4);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @Test
+ public void testGetInputsGraphForEntity() throws Exception {
+ ITypedReferenceableInstance entity =
+ repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
+
+ JSONObject results = new JSONObject(lineageService.getInputsGraphForEntity(entity.getId()._getId()));
+ assertNotNull(results);
+ System.out.println("inputs graph = " + results);
+
+ JSONObject values = results.getJSONObject("values");
+ assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 4);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @Test(dataProvider = "invalidArgumentsProvider")
+ public void testGetOutputsGraphInvalidArguments(final String tableName, String expectedException) throws Exception {
+ testInvalidArguments(expectedException, new Invoker() {
+ @Override
+ void run() throws AtlasException {
+ lineageService.getOutputsGraph(tableName);
+ }
+ });
+ }
+
+ @Test(dataProvider = "invalidArgumentsProvider")
+ public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException)
+ throws Exception {
+ testInvalidArguments(expectedException, new Invoker() {
+ @Override
+ void run() throws AtlasException {
+ lineageService.getOutputsGraphForEntity(tableName);
+ }
+ });
+ }
+
+ @Test
+ public void testGetOutputsGraph() throws Exception {
+ JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact"));
+ assertNotNull(results);
+ System.out.println("outputs graph = " + results);
+
+ JSONObject values = results.getJSONObject("values");
+ assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 3);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @Test
+ public void testGetOutputsGraphForEntity() throws Exception {
+ ITypedReferenceableInstance entity =
+ repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", "sales_fact");
+
+ JSONObject results = new JSONObject(lineageService.getOutputsGraphForEntity(entity.getId()._getId()));
+ assertNotNull(results);
+ System.out.println("outputs graph = " + results);
+
+ JSONObject values = results.getJSONObject("values");
+ assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 3);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @DataProvider(name = "tableNamesProvider")
+ private Object[][] tableNames() {
+ return new String[][]{{"sales_fact", "4"}, {"time_dim", "3"}, {"sales_fact_daily_mv", "4"},
+ {"sales_fact_monthly_mv", "4"}};
+ }
+
+ @Test(dataProvider = "tableNamesProvider")
+ public void testGetSchema(String tableName, String expected) throws Exception {
+ JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+ assertNotNull(results);
+ System.out.println("columns = " + results);
+
+ JSONArray rows = results.getJSONArray("rows");
+ Assert.assertEquals(rows.length(), Integer.parseInt(expected));
+
+ for (int index = 0; index < rows.length(); index++) {
+ final JSONObject row = rows.getJSONObject(index);
+ assertNotNull(row.getString("name"));
+ assertNotNull(row.getString("comment"));
+ assertNotNull(row.getString("dataType"));
+ Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+ }
+ }
+
+ @Test(dataProvider = "tableNamesProvider")
+ public void testGetSchemaForEntity(String tableName, String expected) throws Exception {
+ ITypedReferenceableInstance entity =
+ repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", tableName);
+
+ JSONObject results = new JSONObject(lineageService.getSchemaForEntity(entity.getId()._getId()));
+ assertNotNull(results);
+ System.out.println("columns = " + results);
+
+ JSONArray rows = results.getJSONArray("rows");
+ Assert.assertEquals(rows.length(), Integer.parseInt(expected));
+
+ for (int index = 0; index < rows.length(); index++) {
+ final JSONObject row = rows.getJSONObject(index);
+ assertNotNull(row.getString("name"));
+ assertNotNull(row.getString("comment"));
+ assertNotNull(row.getString("dataType"));
+ Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+ }
+ }
+
+ @DataProvider(name = "invalidArgumentsProvider")
+ private Object[][] arguments() {
+ return new String[][]{{null, IllegalArgumentException.class.getName()},
+ {"", IllegalArgumentException.class.getName()},
+ {"blah", EntityNotFoundException.class.getName()}};
+ }
+
+ abstract class Invoker {
+ abstract void run() throws AtlasException;
+ }
+
+ public void testInvalidArguments(String expectedException, Invoker invoker) throws Exception {
+ try {
+ invoker.run();
+ fail("Expected " + expectedException);
+ } catch(Exception e) {
+ assertEquals(e.getClass().getName(), expectedException);
+ }
+ }
+
+ @Test(dataProvider = "invalidArgumentsProvider")
+ public void testGetSchemaInvalidArguments(final String tableName, String expectedException) throws Exception {
+ testInvalidArguments(expectedException, new Invoker() {
+ @Override
+ void run() throws AtlasException {
+ lineageService.getSchema(tableName);
+ }
+ });
+ }
+
+ @Test(dataProvider = "invalidArgumentsProvider")
+ public void testGetSchemaForEntityInvalidArguments(final String entityId, String expectedException) throws Exception {
+ testInvalidArguments(expectedException, new Invoker() {
+ @Override
+ void run() throws AtlasException {
+ lineageService.getSchemaForEntity(entityId);
+ }
+ });
+ }
+
+ @Test
+ public void testLineageWithDelete() throws Exception {
+ String tableName = "table" + random();
+ createTable(tableName, 3, true);
+
+ JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+ assertEquals(results.getJSONArray("rows").length(), 3);
+
+ results = new JSONObject(lineageService.getInputsGraph(tableName));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
+
+ results = new JSONObject(lineageService.getOutputsGraph(tableName));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
+
+ String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
+
+ results = new JSONObject(lineageService.getSchemaForEntity(tableId));
+ assertEquals(results.getJSONArray("rows").length(), 3);
+
+ results = new JSONObject(lineageService.getInputsGraphForEntity(tableId));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
+
+ results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
+
+ //Delete the entity. Lineage for entity returns the same results as before.
+ //Lineage for table name throws EntityNotFoundException
+ repository.deleteEntities(Arrays.asList(tableId));
+
+ results = new JSONObject(lineageService.getSchemaForEntity(tableId));
+ assertEquals(results.getJSONArray("rows").length(), 3);
+
+ results = new JSONObject(lineageService.getInputsGraphForEntity(tableId));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
+
+ results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
+
+ try {
+ lineageService.getSchema(tableName);
+ fail("Expected EntityNotFoundException");
+ } catch (EntityNotFoundException e) {
+ //expected
+ }
+
+ try {
+ lineageService.getInputsGraph(tableName);
+ fail("Expected EntityNotFoundException");
+ } catch (EntityNotFoundException e) {
+ //expected
+ }
+
+ try {
+ lineageService.getOutputsGraph(tableName);
+ fail("Expected EntityNotFoundException");
+ } catch (EntityNotFoundException e) {
+ //expected
+ }
+
+ //Create table again should show new lineage
+ createTable(tableName, 2, false);
+ results = new JSONObject(lineageService.getSchema(tableName));
+ assertEquals(results.getJSONArray("rows").length(), 2);
+
+ results = new JSONObject(lineageService.getOutputsGraph(tableName));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
+
+ results = new JSONObject(lineageService.getInputsGraph(tableName));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
+
+ tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
+
+ results = new JSONObject(lineageService.getSchemaForEntity(tableId));
+ assertEquals(results.getJSONArray("rows").length(), 2);
+
+ results = new JSONObject(lineageService.getInputsGraphForEntity(tableId));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
+
+ results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId));
+ assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
+ }
+
+ private void createTable(String tableName, int numCols, boolean createLineage) throws Exception {
+ String dbId = getEntityId(DATABASE_TYPE, "name", "Sales");
+ Id salesDB = new Id(dbId, 0, DATABASE_TYPE);
+
+ //Create the entity again and schema should return the new schema
+ List<Referenceable> columns = new ArrayStack();
+ for (int i = 0; i < numCols; i++) {
+ columns.add(column("col" + random(), "int", "column descr"));
+ }
+
+ Referenceable sd =
+ storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true,
+ ImmutableList.of(column("time_id", "int", "time id")));
+
+ Id table = table(tableName, "test table", salesDB, sd, "fetl", "External", columns);
+ if (createLineage) {
+ Id inTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
+ Id outTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
+ loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(inTable),
+ ImmutableList.of(table), "create table as select ", "plan", "id", "graph", "ETL");
+ loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(table),
+ ImmutableList.of(outTable), "create table as select ", "plan", "id", "graph", "ETL");
+ }
+ }
+
+ private String random() {
+ return RandomStringUtils.randomAlphanumeric(5);
+ }
+
+ private String getEntityId(String typeName, String attributeName, String attributeValue) throws Exception {
+ return repository.getEntityDefinition(typeName, attributeName, attributeValue).getId()._getId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
index 3b50dfb..5e7de88 100755
--- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
@@ -20,7 +20,7 @@ package org.apache.atlas.discovery;
import com.google.common.collect.ImmutableSet;
-import org.apache.atlas.BaseHiveRepositoryTest;
+import org.apache.atlas.BaseRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestUtils;
@@ -60,7 +60,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@Guice(modules = RepositoryMetadataModule.class)
-public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest {
+public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
@Inject
private MetadataRepository repositoryService;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java
deleted file mode 100644
index 6d5a15a..0000000
--- a/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.discovery;
-
-import org.apache.atlas.BaseHiveRepositoryTest;
-import org.apache.atlas.RepositoryMetadataModule;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONObject;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import javax.inject.Inject;
-
-/**
- * Unit tests for Hive LineageService.
- */
-@Guice(modules = RepositoryMetadataModule.class)
-public class HiveLineageServiceTest extends BaseHiveRepositoryTest {
-
- @Inject
- private DiscoveryService discoveryService;
-
- @Inject
- private HiveLineageService hiveLineageService;
-
- @BeforeClass
- public void setUp() throws Exception {
- super.setUp();
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- super.tearDown();
- }
-
- @DataProvider(name = "dslQueriesProvider")
- private Object[][] createDSLQueries() {
- return new String[][]{
- // joins
- {"hive_table where name=\"sales_fact\", columns"},
- {"hive_table where name=\"sales_fact\", columns select name, dataType, comment"},
- {"hive_table where name=\"sales_fact\", columns as c select c.name, c.dataType, c.comment"},
- // {"hive_db as db where (db.name=\"Reporting\"), hive_table as table select db.name,
- // table.name"},
- {"from hive_db"}, {"hive_db"}, {"hive_db where hive_db.name=\"Reporting\""},
- {"hive_db hive_db.name = \"Reporting\""},
- {"hive_db where hive_db.name=\"Reporting\" select name, owner"}, {"hive_db has name"},
- // {"hive_db, hive_table"},
- // {"hive_db, hive_process has name"},
- // {"hive_db as db1, hive_table where db1.name = \"Reporting\""},
- // {"hive_db where hive_db.name=\"Reporting\" and hive_db.createTime < " + System
- // .currentTimeMillis()},
- {"from hive_table"}, {"hive_table"}, {"hive_table is Dimension"},
- {"hive_column where hive_column isa PII"},
- // {"hive_column where hive_column isa PII select hive_column.name"},
- {"hive_column select hive_column.name"}, {"hive_column select name"},
- {"hive_column where hive_column.name=\"customer_id\""}, {"from hive_table select hive_table.name"},
- {"hive_db where (name = \"Reporting\")"},
- {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1"},
- {"hive_db where hive_db has name"},
- // {"hive_db hive_table"},
- {"hive_db where hive_db has name"},
- // {"hive_db as db1 hive_table where (db1.name = \"Reporting\")"},
- {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 "},
- // {"hive_db where (name = \"Reporting\") and ((createTime + 1) > 0)"},
- // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name =
- // \"Reporting\") select db1.name as dbName, tab.name as tabName"},
- // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) or (db1.name =
- // \"Reporting\") select db1.name as dbName, tab.name as tabName"},
- // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name =
- // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"},
- // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name =
- // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"},
- // trait searches
- {"Dimension"}, {"Fact"}, {"ETL"}, {"Metric"}, {"PII"},};
- }
-
- @Test(dataProvider = "dslQueriesProvider")
- public void testSearchByDSLQueries(String dslQuery) throws Exception {
- System.out.println("Executing dslQuery = " + dslQuery);
- String jsonResults = discoveryService.searchByDSL(dslQuery);
- Assert.assertNotNull(jsonResults);
-
- JSONObject results = new JSONObject(jsonResults);
- Assert.assertEquals(results.length(), 3);
- System.out.println("results = " + results);
-
- Object query = results.get("query");
- Assert.assertNotNull(query);
-
- JSONObject dataType = results.getJSONObject("dataType");
- Assert.assertNotNull(dataType);
- String typeName = dataType.getString("typeName");
- Assert.assertNotNull(typeName);
-
- JSONArray rows = results.getJSONArray("rows");
- Assert.assertNotNull(rows);
- Assert.assertTrue(rows.length() >= 0); // some queries may not have any results
- System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows");
- }
-
- @Test
- public void testGetInputs() throws Exception {
- JSONObject results = new JSONObject(hiveLineageService.getInputs("sales_fact_monthly_mv"));
- Assert.assertNotNull(results);
- System.out.println("inputs = " + results);
-
- JSONArray rows = results.getJSONArray("rows");
- Assert.assertTrue(rows.length() > 0);
-
- final JSONObject row = rows.getJSONObject(0);
- JSONArray paths = row.getJSONArray("path");
- Assert.assertTrue(paths.length() > 0);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testGetInputsTableNameNull() throws Exception {
- hiveLineageService.getInputs(null);
- Assert.fail();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testGetInputsTableNameEmpty() throws Exception {
- hiveLineageService.getInputs("");
- Assert.fail();
- }
-
- @Test(expectedExceptions = EntityNotFoundException.class)
- public void testGetInputsBadTableName() throws Exception {
- hiveLineageService.getInputs("blah");
- Assert.fail();
- }
-
- @Test
- public void testGetInputsGraph() throws Exception {
- JSONObject results = new JSONObject(hiveLineageService.getInputsGraph("sales_fact_monthly_mv"));
- Assert.assertNotNull(results);
- System.out.println("inputs graph = " + results);
-
- JSONObject values = results.getJSONObject("values");
- Assert.assertNotNull(values);
-
- final JSONObject vertices = values.getJSONObject("vertices");
- Assert.assertEquals(vertices.length(), 4);
-
- final JSONObject edges = values.getJSONObject("edges");
- Assert.assertEquals(edges.length(), 4);
- }
-
- @Test
- public void testGetOutputs() throws Exception {
- JSONObject results = new JSONObject(hiveLineageService.getOutputs("sales_fact"));
- Assert.assertNotNull(results);
- System.out.println("outputs = " + results);
-
- JSONArray rows = results.getJSONArray("rows");
- Assert.assertTrue(rows.length() > 0);
-
- final JSONObject row = rows.getJSONObject(0);
- JSONArray paths = row.getJSONArray("path");
- Assert.assertTrue(paths.length() > 0);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testGetOututsTableNameNull() throws Exception {
- hiveLineageService.getOutputs(null);
- Assert.fail();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testGetOutputsTableNameEmpty() throws Exception {
- hiveLineageService.getOutputs("");
- Assert.fail();
- }
-
- @Test(expectedExceptions = EntityNotFoundException.class)
- public void testGetOutputsBadTableName() throws Exception {
- hiveLineageService.getOutputs("blah");
- Assert.fail();
- }
-
- @Test
- public void testGetOutputsGraph() throws Exception {
- JSONObject results = new JSONObject(hiveLineageService.getOutputsGraph("sales_fact"));
- Assert.assertNotNull(results);
- System.out.println("outputs graph = " + results);
-
- JSONObject values = results.getJSONObject("values");
- Assert.assertNotNull(values);
-
- final JSONObject vertices = values.getJSONObject("vertices");
- Assert.assertEquals(vertices.length(), 3);
-
- final JSONObject edges = values.getJSONObject("edges");
- Assert.assertEquals(edges.length(), 4);
- }
-
- @DataProvider(name = "tableNamesProvider")
- private Object[][] tableNames() {
- return new String[][]{{"sales_fact", "4"}, {"time_dim", "3"}, {"sales_fact_daily_mv", "4"},
- {"sales_fact_monthly_mv", "4"}};
- }
-
- @Test(dataProvider = "tableNamesProvider")
- public void testGetSchema(String tableName, String expected) throws Exception {
- JSONObject results = new JSONObject(hiveLineageService.getSchema(tableName));
- Assert.assertNotNull(results);
- System.out.println("columns = " + results);
-
- JSONArray rows = results.getJSONArray("rows");
- Assert.assertEquals(rows.length(), Integer.parseInt(expected));
-
- for (int index = 0; index < rows.length(); index++) {
- final JSONObject row = rows.getJSONObject(index);
- Assert.assertNotNull(row.getString("name"));
- Assert.assertNotNull(row.getString("comment"));
- Assert.assertNotNull(row.getString("dataType"));
- Assert.assertEquals(row.getString("$typeName$"), "hive_column");
- }
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testGetSchemaTableNameNull() throws Exception {
- hiveLineageService.getSchema(null);
- Assert.fail();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testGetSchemaTableNameEmpty() throws Exception {
- hiveLineageService.getSchema("");
- Assert.fail();
- }
-
- @Test(expectedExceptions = EntityNotFoundException.class)
- public void testGetSchemaBadTableName() throws Exception {
- hiveLineageService.getSchema("blah");
- Assert.fail();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
----------------------------------------------------------------------
diff --git a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
index 2fd8bb9..f65cedb 100755
--- a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
+++ b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
@@ -107,7 +107,7 @@ class GremlinTest2 extends BaseGremlinTest {
}
@Test def testHighLevelLineage {
- val r = HiveLineageQuery("Table", "sales_fact_monthly_mv",
+ val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv",
"LoadProcess",
"inputTables",
"outputTable",
@@ -116,7 +116,7 @@ class GremlinTest2 extends BaseGremlinTest {
}
@Test def testHighLevelLineageReturnGraph {
- val r = HiveLineageQuery("Table", "sales_fact_monthly_mv",
+ val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv",
"LoadProcess",
"inputTables",
"outputTable",
@@ -127,7 +127,7 @@ class GremlinTest2 extends BaseGremlinTest {
}
@Test def testHighLevelWhereUsed {
- val r = HiveWhereUsedQuery("Table", "sales_fact",
+ val r = OutputLineageClosureQuery("Table", "name", "sales_fact",
"LoadProcess",
"inputTables",
"outputTable",
@@ -136,7 +136,7 @@ class GremlinTest2 extends BaseGremlinTest {
}
@Test def testHighLevelWhereUsedReturnGraph {
- val r = HiveWhereUsedQuery("Table", "sales_fact",
+ val r = OutputLineageClosureQuery("Table", "name", "sales_fact",
"LoadProcess",
"inputTables",
"outputTable",
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java b/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java
index 8dc36cd..5aab355 100644
--- a/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java
+++ b/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java
@@ -26,42 +26,50 @@ import org.apache.atlas.AtlasException;
public interface LineageService {
/**
- * Return the lineage outputs for the given tableName.
+ * Return the lineage outputs graph for the given datasetName.
*
- * @param tableName tableName
- * @return Outputs as JSON
+ * @param datasetName datasetName
+ * @return Outputs Graph as JSON
*/
- String getOutputs(String tableName) throws AtlasException;
+ String getOutputsGraph(String datasetName) throws AtlasException;
/**
- * Return the lineage outputs graph for the given tableName.
+ * Return the lineage inputs graph for the given datasetName.
*
- * @param tableName tableName
- * @return Outputs Graph as JSON
+ * @param datasetName datasetName
+ * @return Inputs Graph as JSON
*/
- String getOutputsGraph(String tableName) throws AtlasException;
+ String getInputsGraph(String datasetName) throws AtlasException;
/**
- * Return the lineage inputs for the given tableName.
+ * Return the lineage inputs graph for the given entity id.
*
- * @param tableName tableName
- * @return Inputs as JSON
+ * @param guid entity id
+ * @return Inputs Graph as JSON
*/
- String getInputs(String tableName) throws AtlasException;
+ String getInputsGraphForEntity(String guid) throws AtlasException;
/**
- * Return the lineage inputs graph for the given tableName.
+ * Return the lineage inputs graph for the given entity id.
*
- * @param tableName tableName
+ * @param guid entity id
* @return Inputs Graph as JSON
*/
- String getInputsGraph(String tableName) throws AtlasException;
+ String getOutputsGraphForEntity(String guid) throws AtlasException;
+
+ /**
+ * Return the schema for the given datasetName.
+ *
+ * @param datasetName datasetName
+ * @return Schema as JSON
+ */
+ String getSchema(String datasetName) throws AtlasException;
/**
- * Return the schema for the given tableName.
+ * Return the schema for the given entity id.
*
- * @param tableName tableName
+ * @param guid tableName
* @return Schema as JSON
*/
- String getSchema(String tableName) throws AtlasException;
+ String getSchemaForEntity(String guid) throws AtlasException;
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/typesystem/src/main/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties
index aafad0f..a8e77bb 100644
--- a/typesystem/src/main/resources/atlas-application.properties
+++ b/typesystem/src/main/resources/atlas-application.properties
@@ -49,14 +49,8 @@ atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
######### Hive Lineage Configs #########
-# This models reflects the base super types for Data and Process
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
+atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns
######### Notification Configs #########
atlas.notification.embedded=true
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java
new file mode 100644
index 0000000..bb7fe46
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.web.resources;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.discovery.DiscoveryException;
+import org.apache.atlas.discovery.LineageService;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.web.util.Servlets;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+
+/**
+ * Jersey Resource for Hive Table Lineage.
+ */
+@Path("lineage/hive")
+@Singleton
+public class DataSetLineageResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
+
+ private final LineageService lineageService;
+
+ /**
+ * Created by the Guice ServletModule and injected with the
+ * configured LineageService.
+ *
+ * @param lineageService lineage service handle
+ */
+ @Inject
+ public DataSetLineageResource(LineageService lineageService) {
+ this.lineageService = lineageService;
+ }
+
+ /**
+ * Returns the inputs graph for a given entity.
+ *
+ * @param tableName table name
+ */
+ @GET
+ @Path("table/{tableName}/inputs/graph")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public Response inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
+ LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
+
+ try {
+ final String jsonResult = lineageService.getInputsGraph(tableName);
+
+ JSONObject response = new JSONObject();
+ response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
+ response.put("tableName", tableName);
+ response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+
+ return Response.ok(response).build();
+ } catch (EntityNotFoundException e) {
+ LOG.error("table entity not found for {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
+ } catch (DiscoveryException | IllegalArgumentException e) {
+ LOG.error("Unable to get lineage inputs graph for table {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
+ } catch (Throwable e) {
+ LOG.error("Unable to get lineage inputs graph for table {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ /**
+ * Returns the outputs graph for a given entity.
+ *
+ * @param tableName table name
+ */
+ @GET
+ @Path("table/{tableName}/outputs/graph")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public Response outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
+ LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
+
+ try {
+ final String jsonResult = lineageService.getOutputsGraph(tableName);
+
+ JSONObject response = new JSONObject();
+ response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
+ response.put("tableName", tableName);
+ response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+
+ return Response.ok(response).build();
+ } catch (EntityNotFoundException e) {
+ LOG.error("table entity not found for {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
+ } catch (DiscoveryException | IllegalArgumentException e) {
+ LOG.error("Unable to get lineage outputs graph for table {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
+ } catch (Throwable e) {
+ LOG.error("Unable to get lineage outputs graph for table {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ /**
+ * Return the schema for the given tableName.
+ *
+ * @param tableName table name
+ */
+ @GET
+ @Path("table/{tableName}/schema")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public Response schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
+ LOG.info("Fetching schema for tableName={}", tableName);
+
+ try {
+ final String jsonResult = lineageService.getSchema(tableName);
+
+ JSONObject response = new JSONObject();
+ response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
+ response.put("tableName", tableName);
+ response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+
+ return Response.ok(response).build();
+ } catch (EntityNotFoundException e) {
+ LOG.error("table entity not found for {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
+ } catch (DiscoveryException | IllegalArgumentException e) {
+ LOG.error("Unable to get schema for table {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
+ } catch (Throwable e) {
+ LOG.error("Unable to get schema for table {}", tableName, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java
deleted file mode 100644
index 9b3fbc9..0000000
--- a/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.web.resources;
-
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.utils.ParamChecker;
-import org.apache.atlas.discovery.DiscoveryException;
-import org.apache.atlas.discovery.LineageService;
-import org.apache.atlas.web.util.Servlets;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.Response;
-
-/**
- * Jersey Resource for Hive Table Lineage.
- */
-@Path("lineage/hive")
-@Singleton
-public class HiveLineageResource {
-
- private static final Logger LOG = LoggerFactory.getLogger(HiveLineageResource.class);
-
- private final LineageService lineageService;
-
- /**
- * Created by the Guice ServletModule and injected with the
- * configured LineageService.
- *
- * @param lineageService lineage service handle
- */
- @Inject
- public HiveLineageResource(LineageService lineageService) {
- this.lineageService = lineageService;
- }
-
- /**
- * Returns the inputs graph for a given entity.
- *
- * @param tableName table name
- */
- @GET
- @Path("table/{tableName}/inputs/graph")
- @Consumes(Servlets.JSON_MEDIA_TYPE)
- @Produces(Servlets.JSON_MEDIA_TYPE)
- public Response inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
- LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
-
- try {
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- final String jsonResult = lineageService.getInputsGraph(tableName);
-
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put("tableName", tableName);
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
-
- return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
- LOG.error("Unable to get lineage inputs graph for table {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
- } catch (Throwable e) {
- LOG.error("Unable to get lineage inputs graph for table {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
- }
- }
-
- /**
- * Returns the outputs graph for a given entity.
- *
- * @param tableName table name
- */
- @GET
- @Path("table/{tableName}/outputs/graph")
- @Consumes(Servlets.JSON_MEDIA_TYPE)
- @Produces(Servlets.JSON_MEDIA_TYPE)
- public Response outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
- LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
-
- try {
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- final String jsonResult = lineageService.getOutputsGraph(tableName);
-
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put("tableName", tableName);
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
-
- return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
- LOG.error("Unable to get lineage outputs graph for table {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
- } catch (Throwable e) {
- LOG.error("Unable to get lineage outputs graph for table {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
- }
- }
-
- /**
- * Return the schema for the given tableName.
- *
- * @param tableName table name
- */
- @GET
- @Path("table/{tableName}/schema")
- @Consumes(Servlets.JSON_MEDIA_TYPE)
- @Produces(Servlets.JSON_MEDIA_TYPE)
- public Response schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
- LOG.info("Fetching schema for tableName={}", tableName);
-
- try {
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- final String jsonResult = lineageService.getSchema(tableName);
-
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put("tableName", tableName);
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
-
- return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
- LOG.error("Unable to get schema for table {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
- } catch (Throwable e) {
- LOG.error("Unable to get schema for table {}", tableName, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
new file mode 100644
index 0000000..7c92c33
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.web.resources;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.discovery.DiscoveryException;
+import org.apache.atlas.discovery.LineageService;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.web.util.Servlets;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
+@Path("lineage")
+@Singleton
+public class LineageResource {
+ private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
+
+ private final LineageService lineageService;
+
+ /**
+ * Created by the Guice ServletModule and injected with the
+ * configured LineageService.
+ *
+ * @param lineageService lineage service handle
+ */
+ @Inject
+ public LineageResource(LineageService lineageService) {
+ this.lineageService = lineageService;
+ }
+
+ /**
+ * Returns input lineage graph for the given entity id.
+ * @param guid dataset entity id
+ * @return
+ */
+ @GET
+ @Path("{guid}/inputs/graph")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public Response inputsGraph(@PathParam("guid") String guid) {
+ LOG.info("Fetching lineage inputs graph for guid={}", guid);
+
+ try {
+ final String jsonResult = lineageService.getInputsGraphForEntity(guid);
+
+ JSONObject response = new JSONObject();
+ response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
+ response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+
+ return Response.ok(response).build();
+ } catch (EntityNotFoundException e) {
+ LOG.error("entity not found for guid={}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
+ } catch (DiscoveryException | IllegalArgumentException e) {
+ LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
+ } catch (Throwable e) {
+ LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ /**
+ * Returns the outputs graph for a given entity id.
+ *
+ * @param guid dataset entity id
+ */
+ @GET
+ @Path("{guid}/outputs/graph")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public Response outputsGraph(@PathParam("guid") String guid) {
+ LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
+
+ try {
+ final String jsonResult = lineageService.getOutputsGraphForEntity(guid);
+
+ JSONObject response = new JSONObject();
+ response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
+ response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+
+ return Response.ok(response).build();
+ } catch (EntityNotFoundException e) {
+ LOG.error("table entity not found for {}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
+ } catch (DiscoveryException | IllegalArgumentException e) {
+ LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
+ } catch (Throwable e) {
+ LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ /**
+ * Returns the schema for the given dataset id.
+ *
+ * @param guid dataset entity id
+ */
+ @GET
+ @Path("{guid}/schema")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public Response schema(@PathParam("guid") String guid) {
+ LOG.info("Fetching schema for entity guid={}", guid);
+
+ try {
+ final String jsonResult = lineageService.getSchemaForEntity(guid);
+
+ JSONObject response = new JSONObject();
+ response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
+ response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+
+ return Response.ok(response).build();
+ } catch (EntityNotFoundException e) {
+ LOG.error("table entity not found for {}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
+ } catch (DiscoveryException | IllegalArgumentException e) {
+ LOG.error("Unable to get schema for entity guid={}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
+ } catch (Throwable e) {
+ LOG.error("Unable to get schema for entity={}", guid, e);
+ throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java
new file mode 100644
index 0000000..41f7c31
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.web.resources;
+
+import com.google.common.collect.ImmutableList;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.web.util.Servlets;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.Response;
+import java.util.List;
+
+/**
+ * Hive Lineage Integration Tests.
+ */
+public class DataSetLineageJerseyResourceIT extends BaseResourceIT {
+
+ private static final String BASE_URI = "api/atlas/lineage/hive/table/";
+ private String salesFactTable;
+ private String salesMonthlyTable;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ super.setUp();
+
+ createTypeDefinitions();
+ setupInstances();
+ }
+
+ @Test
+ public void testInputsGraph() throws Exception {
+ WebResource resource = service.path(BASE_URI).path(salesMonthlyTable).path("inputs").path("graph");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
+
+ String responseAsString = clientResponse.getEntity(String.class);
+ Assert.assertNotNull(responseAsString);
+ System.out.println("inputs graph = " + responseAsString);
+
+ JSONObject response = new JSONObject(responseAsString);
+ Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
+
+ JSONObject results = response.getJSONObject(AtlasClient.RESULTS);
+ Assert.assertNotNull(results);
+
+ JSONObject values = results.getJSONObject("values");
+ Assert.assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 4);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @Test
+ public void testInputsGraphForEntity() throws Exception {
+ String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesMonthlyTable).getId()._getId();
+ JSONObject results = serviceClient.getInputGraphForEntity(tableId);
+ Assert.assertNotNull(results);
+
+ JSONObject values = results.getJSONObject("values");
+ Assert.assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 4);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @Test
+ public void testOutputsGraph() throws Exception {
+ WebResource resource = service.path(BASE_URI).path(salesFactTable).path("outputs").path("graph");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
+
+ String responseAsString = clientResponse.getEntity(String.class);
+ Assert.assertNotNull(responseAsString);
+ System.out.println("outputs graph= " + responseAsString);
+
+ JSONObject response = new JSONObject(responseAsString);
+ Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
+
+ JSONObject results = response.getJSONObject(AtlasClient.RESULTS);
+ Assert.assertNotNull(results);
+
+ JSONObject values = results.getJSONObject("values");
+ Assert.assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 3);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @Test
+ public void testOutputsGraphForEntity() throws Exception {
+ String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesFactTable).getId()._getId();
+ JSONObject results = serviceClient.getOutputGraphForEntity(tableId);
+ Assert.assertNotNull(results);
+
+ JSONObject values = results.getJSONObject("values");
+ Assert.assertNotNull(values);
+
+ final JSONObject vertices = values.getJSONObject("vertices");
+ Assert.assertEquals(vertices.length(), 3);
+
+ final JSONObject edges = values.getJSONObject("edges");
+ Assert.assertEquals(edges.length(), 4);
+ }
+
+ @Test
+ public void testSchema() throws Exception {
+ WebResource resource = service.path(BASE_URI).path(salesFactTable).path("schema");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
+
+ String responseAsString = clientResponse.getEntity(String.class);
+ Assert.assertNotNull(responseAsString);
+ System.out.println("schema = " + responseAsString);
+
+ JSONObject response = new JSONObject(responseAsString);
+ Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
+
+ JSONObject results = response.getJSONObject(AtlasClient.RESULTS);
+ Assert.assertNotNull(results);
+
+ JSONArray rows = results.getJSONArray("rows");
+ Assert.assertEquals(rows.length(), 4);
+
+ for (int index = 0; index < rows.length(); index++) {
+ final JSONObject row = rows.getJSONObject(index);
+ Assert.assertNotNull(row.getString("name"));
+ Assert.assertNotNull(row.getString("comment"));
+ Assert.assertNotNull(row.getString("dataType"));
+ Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+ }
+ }
+
+ @Test
+ public void testSchemaForEntity() throws Exception {
+ String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesFactTable).getId()._getId();
+ JSONObject results = serviceClient.getSchemaForEntity(tableId);
+ Assert.assertNotNull(results);
+
+ JSONArray rows = results.getJSONArray("rows");
+ Assert.assertEquals(rows.length(), 4);
+
+ for (int index = 0; index < rows.length(); index++) {
+ final JSONObject row = rows.getJSONObject(index);
+ Assert.assertNotNull(row.getString("name"));
+ Assert.assertNotNull(row.getString("comment"));
+ Assert.assertNotNull(row.getString("dataType"));
+ Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+ }
+ }
+
+ @Test
+ public void testSchemaForEmptyTable() throws Exception {
+ WebResource resource = service.path(BASE_URI).path("").path("schema");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
+ }
+
+ @Test
+ public void testSchemaForInvalidTable() throws Exception {
+ WebResource resource = service.path(BASE_URI).path("blah").path("schema");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
+ }
+
+ private void setupInstances() throws Exception {
+ Id salesDB = database("Sales" + randomString(), "Sales Database", "John ETL",
+ "hdfs://host:8000/apps/warehouse/sales");
+
+ List<Referenceable> salesFactColumns = ImmutableList
+ .of(column("time_id", "int", "time id"), column("product_id", "int", "product id"),
+ column("customer_id", "int", "customer id", "pii"),
+ column("sales", "double", "product id", "Metric"));
+
+ salesFactTable = "sales_fact" + randomString();
+ Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns, "Fact");
+
+ List<Referenceable> timeDimColumns = ImmutableList
+ .of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"),
+ column("weekDay", "int", "week Day"));
+
+ Id timeDim =
+ table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL",
+ timeDimColumns, "Dimension");
+
+ Id reportingDB =
+ database("Reporting" + randomString(), "reporting database", "Jane BI",
+ "hdfs://host:8000/apps/warehouse/reporting");
+
+ Id salesFactDaily =
+ table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
+ "Joe BI", "MANAGED", salesFactColumns, "Metric");
+
+ loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim),
+ ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
+
+ salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
+ Id salesFactMonthly =
+ table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI",
+ "MANAGED", salesFactColumns, "Metric");
+
+ loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily),
+ ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
+ }
+
+ Id database(String name, String description, String owner, String locationUri, String... traitNames)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("locationUri", locationUri);
+ referenceable.set("createTime", System.currentTimeMillis());
+
+ return createInstance(referenceable);
+ }
+
+ Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("dataType", dataType);
+ referenceable.set("comment", comment);
+
+ return referenceable;
+ }
+
+ Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns,
+ String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("tableType", tableType);
+ referenceable.set("createTime", System.currentTimeMillis());
+ referenceable.set("lastAccessTime", System.currentTimeMillis());
+ referenceable.set("retention", System.currentTimeMillis());
+
+ referenceable.set("db", dbId);
+ referenceable.set("columns", columns);
+
+ return createInstance(referenceable);
+ }
+
+ Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
+ String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("qualifiedName", name);
+ referenceable.set("user", user);
+ referenceable.set("startTime", System.currentTimeMillis());
+ referenceable.set("endTime", System.currentTimeMillis() + 10000);
+
+ referenceable.set("inputs", inputTables);
+ referenceable.set("outputs", outputTables);
+
+ referenceable.set("queryText", queryText);
+ referenceable.set("queryPlan", queryPlan);
+ referenceable.set("queryId", queryId);
+ referenceable.set("queryGraph", queryGraph);
+
+ return createInstance(referenceable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java
deleted file mode 100644
index 0fb5ea2..0000000
--- a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.web.resources;
-
-import com.google.common.collect.ImmutableList;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.web.util.Servlets;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONObject;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.Response;
-import java.util.List;
-
-/**
- * Hive Lineage Integration Tests.
- */
-public class HiveLineageJerseyResourceIT extends BaseResourceIT {
-
- private static final String BASE_URI = "api/atlas/lineage/hive/table/";
- private String salesFactTable;
- private String salesMonthlyTable;
-
- @BeforeClass
- public void setUp() throws Exception {
- super.setUp();
-
- createTypeDefinitions();
- setupInstances();
- }
-
- @Test
- public void testInputsGraph() throws Exception {
- WebResource resource = service.path(BASE_URI).path(salesMonthlyTable).path("inputs").path("graph");
-
- ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
- .method(HttpMethod.GET, ClientResponse.class);
- Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
-
- String responseAsString = clientResponse.getEntity(String.class);
- Assert.assertNotNull(responseAsString);
- System.out.println("inputs graph = " + responseAsString);
-
- JSONObject response = new JSONObject(responseAsString);
- Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
-
- JSONObject results = response.getJSONObject(AtlasClient.RESULTS);
- Assert.assertNotNull(results);
-
- JSONObject values = results.getJSONObject("values");
- Assert.assertNotNull(values);
-
- final JSONObject vertices = values.getJSONObject("vertices");
- Assert.assertEquals(vertices.length(), 4);
-
- final JSONObject edges = values.getJSONObject("edges");
- Assert.assertEquals(edges.length(), 4);
- }
-
- @Test
- public void testOutputsGraph() throws Exception {
- WebResource resource = service.path(BASE_URI).path(salesFactTable).path("outputs").path("graph");
-
- ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
- .method(HttpMethod.GET, ClientResponse.class);
- Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
-
- String responseAsString = clientResponse.getEntity(String.class);
- Assert.assertNotNull(responseAsString);
- System.out.println("outputs graph= " + responseAsString);
-
- JSONObject response = new JSONObject(responseAsString);
- Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
-
- JSONObject results = response.getJSONObject(AtlasClient.RESULTS);
- Assert.assertNotNull(results);
-
- JSONObject values = results.getJSONObject("values");
- Assert.assertNotNull(values);
-
- final JSONObject vertices = values.getJSONObject("vertices");
- Assert.assertEquals(vertices.length(), 3);
-
- final JSONObject edges = values.getJSONObject("edges");
- Assert.assertEquals(edges.length(), 4);
- }
-
- @Test
- public void testSchema() throws Exception {
- WebResource resource = service.path(BASE_URI).path(salesFactTable).path("schema");
-
- ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
- .method(HttpMethod.GET, ClientResponse.class);
- Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
-
- String responseAsString = clientResponse.getEntity(String.class);
- Assert.assertNotNull(responseAsString);
- System.out.println("schema = " + responseAsString);
-
- JSONObject response = new JSONObject(responseAsString);
- Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
-
- JSONObject results = response.getJSONObject(AtlasClient.RESULTS);
- Assert.assertNotNull(results);
-
- JSONArray rows = results.getJSONArray("rows");
- Assert.assertEquals(rows.length(), 4);
-
- for (int index = 0; index < rows.length(); index++) {
- final JSONObject row = rows.getJSONObject(index);
- Assert.assertNotNull(row.getString("name"));
- Assert.assertNotNull(row.getString("comment"));
- Assert.assertNotNull(row.getString("dataType"));
- Assert.assertEquals(row.getString("$typeName$"), "hive_column");
- }
- }
-
- @Test
- public void testSchemaForEmptyTable() throws Exception {
- WebResource resource = service.path(BASE_URI).path("").path("schema");
-
- ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
- .method(HttpMethod.GET, ClientResponse.class);
- Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
- }
-
- @Test
- public void testSchemaForInvalidTable() throws Exception {
- WebResource resource = service.path(BASE_URI).path("blah").path("schema");
-
- ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
- .method(HttpMethod.GET, ClientResponse.class);
- Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
- }
-
- private void setupInstances() throws Exception {
- Id salesDB = database("Sales" + randomString(), "Sales Database", "John ETL",
- "hdfs://host:8000/apps/warehouse/sales");
-
- List<Referenceable> salesFactColumns = ImmutableList
- .of(column("time_id", "int", "time id"), column("product_id", "int", "product id"),
- column("customer_id", "int", "customer id", "pii"),
- column("sales", "double", "product id", "Metric"));
-
- salesFactTable = "sales_fact" + randomString();
- Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns, "Fact");
-
- List<Referenceable> timeDimColumns = ImmutableList
- .of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"),
- column("weekDay", "int", "week Day"));
-
- Id timeDim =
- table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL",
- timeDimColumns, "Dimension");
-
- Id reportingDB =
- database("Reporting" + randomString(), "reporting database", "Jane BI",
- "hdfs://host:8000/apps/warehouse/reporting");
-
- Id salesFactDaily =
- table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
- "Joe BI", "MANAGED", salesFactColumns, "Metric");
-
- String procName = "loadSalesDaily" + randomString();
- loadProcess(procName, "John ETL", ImmutableList.of(salesFact, timeDim),
- ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
-
- salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
- Id salesFactMonthly =
- table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI",
- "MANAGED", salesFactColumns, "Metric");
-
- loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily),
- ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
- }
-
- Id database(String name, String description, String owner, String locationUri, String... traitNames)
- throws Exception {
- Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("description", description);
- referenceable.set("owner", owner);
- referenceable.set("locationUri", locationUri);
- referenceable.set("createTime", System.currentTimeMillis());
-
- return createInstance(referenceable);
- }
-
- Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("dataType", dataType);
- referenceable.set("comment", comment);
-
- return referenceable;
- }
-
- Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns,
- String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("description", description);
- referenceable.set("owner", owner);
- referenceable.set("tableType", tableType);
- referenceable.set("createTime", System.currentTimeMillis());
- referenceable.set("lastAccessTime", System.currentTimeMillis());
- referenceable.set("retention", System.currentTimeMillis());
-
- referenceable.set("db", dbId);
- referenceable.set("columns", columns);
-
- return createInstance(referenceable);
- }
-
- Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
- String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
- referenceable.set(AtlasClient.NAME, name);
- referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
- referenceable.set("user", user);
- referenceable.set("startTime", System.currentTimeMillis());
- referenceable.set("endTime", System.currentTimeMillis() + 10000);
-
- referenceable.set("inputs", inputTables);
- referenceable.set("outputs", outputTables);
-
- referenceable.set("queryText", queryText);
- referenceable.set("queryPlan", queryPlan);
- referenceable.set("queryId", queryId);
- referenceable.set("queryGraph", queryGraph);
-
- return createInstance(referenceable);
- }
-}
[2/2] incubator-atlas git commit: ATLAS-713 Entity lineage based on
entity id (shwethags)
Posted by sh...@apache.org.
ATLAS-713 Entity lineage based on entity id (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b65dd91c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b65dd91c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b65dd91c
Branch: refs/heads/master
Commit: b65dd91c3587d35abafc4ec136e162f9a5c92ac1
Parents: 857561a
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed May 18 17:55:24 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed May 18 17:55:24 2016 +0530
----------------------------------------------------------------------
.../main/java/org/apache/atlas/AtlasClient.java | 41 +-
dashboardv2/public/js/models/VLineage.js | 4 +-
dashboardv2/public/js/models/VSchema.js | 4 +-
.../views/detail_page/DetailPageLayoutView.js | 8 +-
.../public/js/views/graph/LineageLayoutView.js | 4 +-
.../public/js/views/schema/SchemaLayoutView.js | 2 +-
distro/src/conf/atlas-application.properties | 10 +-
release-log.txt | 1 +
.../apache/atlas/RepositoryMetadataModule.java | 4 +-
.../atlas/discovery/DataSetLineageService.java | 215 +++++++++
.../atlas/discovery/HiveLineageService.java | 222 ---------
.../org/apache/atlas/query/ClosureQuery.scala | 44 +-
.../apache/atlas/BaseHiveRepositoryTest.java | 377 ----------------
.../org/apache/atlas/BaseRepositoryTest.java | 377 ++++++++++++++++
.../discovery/DataSetLineageServiceTest.java | 447 +++++++++++++++++++
.../GraphBackedDiscoveryServiceTest.java | 4 +-
.../atlas/discovery/HiveLineageServiceTest.java | 260 -----------
.../org/apache/atlas/query/GremlinTest2.scala | 8 +-
.../apache/atlas/discovery/LineageService.java | 44 +-
.../main/resources/atlas-application.properties | 8 +-
.../web/resources/DataSetLineageResource.java | 162 +++++++
.../web/resources/HiveLineageResource.java | 166 -------
.../atlas/web/resources/LineageResource.java | 153 +++++++
.../DataSetLineageJerseyResourceIT.java | 306 +++++++++++++
.../resources/HiveLineageJerseyResourceIT.java | 257 -----------
25 files changed, 1768 insertions(+), 1360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index b3ec95c..7e32cc2 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -90,7 +90,8 @@ public class AtlasClient {
public static final String URI_ENTITY = "entities";
public static final String URI_ENTITY_AUDIT = "audit";
public static final String URI_SEARCH = "discovery/search";
- public static final String URI_LINEAGE = "lineage/hive/table";
+ public static final String URI_NAME_LINEAGE = "lineage/hive/table";
+ public static final String URI_LINEAGE = "lineage/";
public static final String URI_TRAITS = "traits";
public static final String QUERY = "query";
@@ -416,7 +417,12 @@ public class AtlasClient {
SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK),
SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK),
- //Lineage operations
+ //Lineage operations based on dataset name
+ NAME_LINEAGE_INPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+ NAME_LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+ NAME_LINEAGE_SCHEMA(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+
+ //Lineage operations based on entity id of the dataset
LINEAGE_INPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
LINEAGE_SCHEMA(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK);
@@ -988,7 +994,7 @@ public class AtlasClient {
}
public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
- JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
+ JSONObject response = callAPI(API.NAME_LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
try {
return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) {
@@ -997,7 +1003,34 @@ public class AtlasClient {
}
public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
- JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
+ JSONObject response = callAPI(API.NAME_LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
+ try {
+ return response.getJSONObject(AtlasClient.RESULTS);
+ } catch (JSONException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+
+ public JSONObject getInputGraphForEntity(String entityId) throws AtlasServiceException {
+ JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, entityId, "/inputs/graph");
+ try {
+ return response.getJSONObject(AtlasClient.RESULTS);
+ } catch (JSONException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+
+ public JSONObject getOutputGraphForEntity(String datasetId) throws AtlasServiceException {
+ JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/outputs/graph");
+ try {
+ return response.getJSONObject(AtlasClient.RESULTS);
+ } catch (JSONException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+
+ public JSONObject getSchemaForEntity(String datasetId) throws AtlasServiceException {
+ JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/schema");
try {
return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VLineage.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/models/VLineage.js b/dashboardv2/public/js/models/VLineage.js
index e33488a..fa1be05 100644
--- a/dashboardv2/public/js/models/VLineage.js
+++ b/dashboardv2/public/js/models/VLineage.js
@@ -23,7 +23,7 @@ define(['require',
'use strict';
var VLineage = VBaseModel.extend({
- urlRoot: Globals.baseURL + 'api/atlas/lineage/hive/table/assetName/outputs/graph',
+ urlRoot: Globals.baseURL + 'api/atlas/lineage/assetName/outputs/graph',
defaults: {},
@@ -36,7 +36,7 @@ define(['require',
this.bindErrorEvents();
},
toString: function() {
- return this.get('name');
+ return this.get('id');
},
}, {});
return VLineage;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VSchema.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/models/VSchema.js b/dashboardv2/public/js/models/VSchema.js
index 1f8e0bb..24462e6 100644
--- a/dashboardv2/public/js/models/VSchema.js
+++ b/dashboardv2/public/js/models/VSchema.js
@@ -22,7 +22,7 @@ define(['require',
], function(require, Globals, VBaseModel) {
'use strict';
var VSchema = VBaseModel.extend({
- urlRoot: Globals.baseURL + '/api/atlas/lineage/hive/table/log_fact_daily_mv/schema',
+ urlRoot: Globals.baseURL + '/api/atlas/lineage/log_fact_daily_mv/schema',
defaults: {},
@@ -35,7 +35,7 @@ define(['require',
this.bindErrorEvents();
},
toString: function() {
- return this.get('name');
+ return this.get('id');
},
}, {});
return VSchema;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
index 87adec0..0932208 100644
--- a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
+++ b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
@@ -92,7 +92,7 @@ define(['require',
this.renderEntityDetailTableLayoutView();
this.renderTagTableLayoutView(tagGuid);
this.renderLineageLayoutView(tagGuid);
- this.renderSchemaLayoutView();
+ this.renderSchemaLayoutView(tagGuid);
}, this);
},
onRender: function() {},
@@ -120,17 +120,17 @@ define(['require',
require(['views/graph/LineageLayoutView'], function(LineageLayoutView) {
that.RLineageLayoutView.show(new LineageLayoutView({
globalVent: that.globalVent,
- assetName: that.name,
+ assetName: tagGuid,
guid: tagGuid
}));
});
},
- renderSchemaLayoutView: function() {
+ renderSchemaLayoutView: function(tagGuid) {
var that = this;
require(['views/schema/SchemaLayoutView'], function(SchemaLayoutView) {
that.RSchemaTableLayoutView.show(new SchemaLayoutView({
globalVent: that.globalVent,
- name: that.name,
+ name: tagGuid,
vent: that.vent
}));
});
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/graph/LineageLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/graph/LineageLayoutView.js b/dashboardv2/public/js/views/graph/LineageLayoutView.js
index 973d091..31433c1 100644
--- a/dashboardv2/public/js/views/graph/LineageLayoutView.js
+++ b/dashboardv2/public/js/views/graph/LineageLayoutView.js
@@ -56,8 +56,8 @@ define(['require',
this.inputCollection = new VLineageList();
this.outputCollection = new VLineageList();
this.entityModel = new VEntity();
- this.inputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/inputs/graph";
- this.outputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/outputs/graph";
+ this.inputCollection.url = "/api/atlas/lineage/" + this.assetName + "/inputs/graph";
+ this.outputCollection.url = "/api/atlas/lineage/" + this.assetName + "/outputs/graph";
this.bindEvents();
this.fetchGraphData();
this.data = {};
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/schema/SchemaLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/schema/SchemaLayoutView.js b/dashboardv2/public/js/views/schema/SchemaLayoutView.js
index de558a7..301b993 100644
--- a/dashboardv2/public/js/views/schema/SchemaLayoutView.js
+++ b/dashboardv2/public/js/views/schema/SchemaLayoutView.js
@@ -73,7 +73,7 @@ define(['require',
initialize: function(options) {
_.extend(this, _.pick(options, 'globalVent', 'name', 'vent'));
this.schemaCollection = new VSchemaList([], {});
- this.schemaCollection.url = "/api/atlas/lineage/hive/table/" + this.name + "/schema";
+ this.schemaCollection.url = "/api/atlas/lineage/" + this.name + "/schema";
this.commonTableOptions = {
collection: this.schemaCollection,
includeFilter: false,
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 68a0021..d4722fb 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -63,15 +63,9 @@ atlas.kafka.auto.commit.enable=false
######### Hive Lineage Configs #########
-# This models reflects the base super types for Data and Process
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-atlas.lineage.hive.table.schema.query.Table=Table where name='%s'\, columns
+atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns
+atlas.lineage.schema.query.Table=Table where __guid='%s'\, columns
## Server port configuration
#atlas.server.http.port=21000
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b600fff..a68010a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -21,6 +21,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-713 Entity lineage based on entity id (shwethags)
ATLAS-736 UI - BUG :: displaying timestamp values for hive_db description (kevalbhatt18 via yhemanth)
ATLAS-784 Configure config.store.uri for Falcon hook IT (yhemanth)
ATLAS-645 FieldMapping.output() results in stack overflow when instances reference each other (dkantor via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 8dae968..68b707f 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -26,7 +26,7 @@ import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService;
-import org.apache.atlas.discovery.HiveLineageService;
+import org.apache.atlas.discovery.DataSetLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.EntityChangeListener;
@@ -83,7 +83,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
- bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
+ bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
bindAuditRepository(binder());
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
new file mode 100644
index 0000000..39dde2a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.discovery;
+
+import com.thinkaurelius.titan.core.TitanGraph;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.GraphTransaction;
+import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
+import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.query.GremlinQueryResult;
+import org.apache.atlas.query.InputLineageClosureQuery;
+import org.apache.atlas.query.OutputLineageClosureQuery;
+import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
+import org.apache.atlas.utils.ParamChecker;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.collection.immutable.List;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+/**
+ * Hive implementation of Lineage service interface.
+ */
+@Singleton
+public class DataSetLineageService implements LineageService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageService.class);
+
+ private static final Option<List<String>> SELECT_ATTRIBUTES =
+ Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
+ public static final String SELECT_INSTANCE_GUID = "__guid";
+
+ public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query.";
+
+ private static final String HIVE_PROCESS_TYPE_NAME = "Process";
+ private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
+ private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
+
+ private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'";
+ private static final String DATASET_NAME_EXISTS_QUERY =
+ AtlasClient.DATA_SET_SUPER_TYPE + " where name = '%s' and __state = 'ACTIVE'";
+
+ private static final Configuration propertiesConf;
+
+ static {
+ try {
+ propertiesConf = ApplicationProperties.get();
+ } catch (AtlasException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ private final TitanGraph titanGraph;
+ private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
+ private final GraphBackedDiscoveryService discoveryService;
+
+ @Inject
+ DataSetLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
+ GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
+ this.titanGraph = graphProvider.get();
+ this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
+ this.discoveryService = discoveryService;
+ }
+
+ /**
+ * Return the lineage outputs graph for the given datasetName.
+ *
+ * @param datasetName datasetName
+ * @return Outputs Graph as JSON
+ */
+ @Override
+ @GraphTransaction
+ public String getOutputsGraph(String datasetName) throws AtlasException {
+ LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
+ ParamChecker.notEmpty(datasetName, "dataset name");
+ ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+ return getOutputsGraphForId(datasetInstance.getId()._getId());
+ }
+
+ /**
+ * Return the lineage inputs graph for the given tableName.
+ *
+ * @param tableName tableName
+ * @return Inputs Graph as JSON
+ */
+ @Override
+ @GraphTransaction
+ public String getInputsGraph(String tableName) throws AtlasException {
+ LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
+ ParamChecker.notEmpty(tableName, "table name");
+ ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName);
+ return getInputsGraphForId(datasetInstance.getId()._getId());
+ }
+
+ @Override
+ public String getInputsGraphForEntity(String guid) throws AtlasException {
+ LOG.info("Fetching lineage inputs graph for entity={}", guid);
+ ParamChecker.notEmpty(guid, "Entity id");
+ validateDatasetExists(guid);
+ return getInputsGraphForId(guid);
+ }
+
+ private String getInputsGraphForId(String guid) {
+ InputLineageClosureQuery
+ inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID,
+ guid, HIVE_PROCESS_TYPE_NAME,
+ HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
+ SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
+ return inputsQuery.graph().toInstanceJson();
+ }
+
+ @Override
+ public String getOutputsGraphForEntity(String guid) throws AtlasException {
+ LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
+ ParamChecker.notEmpty(guid, "Entity id");
+ validateDatasetExists(guid);
+ return getOutputsGraphForId(guid);
+ }
+
+ private String getOutputsGraphForId(String guid) {
+ OutputLineageClosureQuery outputsQuery =
+ new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, guid, HIVE_PROCESS_TYPE_NAME,
+ HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
+ SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
+ return outputsQuery.graph().toInstanceJson();
+ }
+
+ /**
+ * Return the schema for the given tableName.
+ *
+ * @param datasetName tableName
+ * @return Schema as JSON
+ */
+ @Override
+ @GraphTransaction
+ public String getSchema(String datasetName) throws AtlasException {
+ ParamChecker.notEmpty(datasetName, "table name");
+ LOG.info("Fetching schema for tableName={}", datasetName);
+ ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+
+ return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId());
+ }
+
+ private String getSchemaForId(String typeName, String guid) throws DiscoveryException {
+ final String schemaQuery =
+ String.format(propertiesConf.getString(DATASET_SCHEMA_QUERY_PREFIX + typeName), guid);
+ return discoveryService.searchByDSL(schemaQuery);
+ }
+
+ @Override
+ public String getSchemaForEntity(String guid) throws AtlasException {
+ ParamChecker.notEmpty(guid, "Entity id");
+ LOG.info("Fetching schema for entity guid={}", guid);
+ String typeName = validateDatasetExists(guid);
+ return getSchemaForId(typeName, guid);
+ }
+
+ /**
+ * Validate if indeed this is a table type and exists.
+ *
+ * @param datasetName table name
+ */
+ private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
+ final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
+ GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
+ if (!(queryResult.rows().length() > 0)) {
+ throw new EntityNotFoundException(datasetName + " does not exist");
+ }
+
+ return (ReferenceableInstance)queryResult.rows().apply(0);
+ }
+
+ /**
+ * Validate if indeed this is a table type and exists.
+ *
+ * @param guid entity id
+ */
+ private String validateDatasetExists(String guid) throws AtlasException {
+ final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
+ GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery);
+ if (!(queryResult.rows().length() > 0)) {
+ throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
+ }
+
+ ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
+ return referenceable.getTypeName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
deleted file mode 100644
index 00905d7..0000000
--- a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.discovery;
-
-import com.thinkaurelius.titan.core.TitanGraph;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.GraphTransaction;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.utils.ParamChecker;
-import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
-import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
-import org.apache.atlas.query.Expressions;
-import org.apache.atlas.query.GremlinQueryResult;
-import org.apache.atlas.query.HiveLineageQuery;
-import org.apache.atlas.query.HiveWhereUsedQuery;
-import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphProvider;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Some;
-import scala.collection.immutable.List;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-/**
- * Hive implementation of Lineage service interface.
- */
-@Singleton
-public class HiveLineageService implements LineageService {
-
- private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class);
-
- private static final Option<List<String>> SELECT_ATTRIBUTES =
- Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
-
- public static final String HIVE_TABLE_SCHEMA_QUERY_PREFIX = "atlas.lineage.hive.table.schema.query.";
-
- private static final String HIVE_TABLE_TYPE_NAME;
- private static final String HIVE_PROCESS_TYPE_NAME;
- private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME;
- private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
-
- private static final String HIVE_TABLE_EXISTS_QUERY;
-
- private static final Configuration propertiesConf;
-
- static {
- // todo - externalize this using type system - dog food
- try {
- propertiesConf = ApplicationProperties.get();
- HIVE_TABLE_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.table.type.name", "DataSet");
- HIVE_PROCESS_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.process.type.name", "Process");
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.inputs.name", "inputs");
- HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.outputs.name", "outputs");
-
- HIVE_TABLE_EXISTS_QUERY = propertiesConf.getString("atlas.lineage.hive.table.exists.query",
- "from " + HIVE_TABLE_TYPE_NAME + " where name=\"%s\"");
- } catch (AtlasException e) {
- throw new RuntimeException(e);
- }
- }
-
-
- private final TitanGraph titanGraph;
- private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
- private final GraphBackedDiscoveryService discoveryService;
-
- @Inject
- HiveLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
- GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
- this.titanGraph = graphProvider.get();
- this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
- this.discoveryService = discoveryService;
- }
-
- /**
- * Return the lineage outputs for the given tableName.
- *
- * @param tableName tableName
- * @return Lineage Outputs as JSON
- */
- @Override
- @GraphTransaction
- public String getOutputs(String tableName) throws AtlasException {
- LOG.info("Fetching lineage outputs for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveWhereUsedQuery outputsQuery =
- new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-
- Expressions.Expression expression = outputsQuery.expr();
- LOG.debug("Expression is [" + expression.toString() + "]");
- try {
- return discoveryService.evaluate(expression).toJson();
- } catch (Exception e) { // unable to catch ExpressionException
- throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
- }
- }
-
- /**
- * Return the lineage outputs graph for the given tableName.
- *
- * @param tableName tableName
- * @return Outputs Graph as JSON
- */
- @Override
- @GraphTransaction
- public String getOutputsGraph(String tableName) throws AtlasException {
- LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveWhereUsedQuery outputsQuery =
- new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
- return outputsQuery.graph().toInstanceJson();
- }
-
- /**
- * Return the lineage inputs for the given tableName.
- *
- * @param tableName tableName
- * @return Lineage Inputs as JSON
- */
- @Override
- @GraphTransaction
- public String getInputs(String tableName) throws AtlasException {
- LOG.info("Fetching lineage inputs for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-
- Expressions.Expression expression = inputsQuery.expr();
- LOG.debug("Expression is [" + expression.toString() + "]");
- try {
- return discoveryService.evaluate(expression).toJson();
- } catch (Exception e) { // unable to catch ExpressionException
- throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
- }
- }
-
- /**
- * Return the lineage inputs graph for the given tableName.
- *
- * @param tableName tableName
- * @return Inputs Graph as JSON
- */
- @Override
- @GraphTransaction
- public String getInputsGraph(String tableName) throws AtlasException {
- LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
- return inputsQuery.graph().toInstanceJson();
- }
-
- /**
- * Return the schema for the given tableName.
- *
- * @param tableName tableName
- * @return Schema as JSON
- */
- @Override
- @GraphTransaction
- public String getSchema(String tableName) throws AtlasException {
- LOG.info("Fetching schema for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- String typeName = validateTableExists(tableName);
-
- final String schemaQuery =
- String.format(propertiesConf.getString(HIVE_TABLE_SCHEMA_QUERY_PREFIX + typeName), tableName);
- return discoveryService.searchByDSL(schemaQuery);
- }
-
- /**
- * Validate if indeed this is a table type and exists.
- *
- * @param tableName table name
- */
- private String validateTableExists(String tableName) throws AtlasException {
- final String tableExistsQuery = String.format(HIVE_TABLE_EXISTS_QUERY, tableName);
- GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
- if (!(queryResult.rows().length() > 0)) {
- throw new EntityNotFoundException(tableName + " does not exist");
- }
-
- ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
- return referenceable.getTypeName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
index 05dc6a4..c4621cd 100755
--- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
@@ -256,21 +256,21 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery {
* @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query.
*/
-case class HiveLineageQuery(tableTypeName : String,
- tableName : String,
- ctasTypeName : String,
- ctasInputTableAttribute : String,
- ctasOutputTableAttribute : String,
- depth : Option[Int],
- selectAttributes : Option[List[String]],
- withPath : Boolean,
- persistenceStrategy: GraphPersistenceStrategies,
- g: TitanGraph
+case class InputLineageClosureQuery(tableTypeName : String,
+ attributeToSelectInstance : String,
+ tableName : String,
+ ctasTypeName : String,
+ ctasInputTableAttribute : String,
+ ctasOutputTableAttribute : String,
+ depth : Option[Int],
+ selectAttributes : Option[List[String]],
+ withPath : Boolean,
+ persistenceStrategy: GraphPersistenceStrategies,
+ g: TitanGraph
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
- val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName
@@ -296,21 +296,21 @@ case class HiveLineageQuery(tableTypeName : String,
* @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query.
*/
-case class HiveWhereUsedQuery(tableTypeName : String,
- tableName : String,
- ctasTypeName : String,
- ctasInputTableAttribute : String,
- ctasOutputTableAttribute : String,
- depth : Option[Int],
- selectAttributes : Option[List[String]],
- withPath : Boolean,
- persistenceStrategy: GraphPersistenceStrategies,
- g: TitanGraph
+case class OutputLineageClosureQuery(tableTypeName : String,
+ attributeToSelectInstance : String,
+ tableName : String,
+ ctasTypeName : String,
+ ctasInputTableAttribute : String,
+ ctasOutputTableAttribute : String,
+ depth : Option[Int],
+ selectAttributes : Option[List[String]],
+ withPath : Boolean,
+ persistenceStrategy: GraphPersistenceStrategies,
+ g: TitanGraph
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
- val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
deleted file mode 100644
index 40f0d91..0000000
--- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.thinkaurelius.titan.core.TitanGraph;
-import com.thinkaurelius.titan.core.util.TitanCleanup;
-
-import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
-import org.apache.atlas.repository.graph.GraphProvider;
-import org.apache.atlas.services.MetadataService;
-import org.apache.atlas.typesystem.ITypedReferenceableInstance;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.TypesSerialization;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.AttributeDefinition;
-import org.apache.atlas.typesystem.types.ClassType;
-import org.apache.atlas.typesystem.types.DataTypes;
-import org.apache.atlas.typesystem.types.EnumTypeDefinition;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.IDataType;
-import org.apache.atlas.typesystem.types.Multiplicity;
-import org.apache.atlas.typesystem.types.StructTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
-import org.testng.annotations.Guice;
-
-import javax.inject.Inject;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-/**
- * Base Class to set up hive types and instances for tests
- */
-@Guice(modules = RepositoryMetadataModule.class)
-public class BaseHiveRepositoryTest {
-
- @Inject
- protected MetadataService metadataService;
-
- @Inject
- protected MetadataRepository repository;
-
- @Inject
- protected GraphProvider<TitanGraph> graphProvider;
-
- protected void setUp() throws Exception {
- setUpTypes();
- new GraphBackedSearchIndexer(graphProvider);
- RequestContext.createContext();
- setupInstances();
- TestUtils.dumpGraph(graphProvider.get());
- }
-
- protected void tearDown() throws Exception {
- TypeSystem.getInstance().reset();
- try {
- graphProvider.get().shutdown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- try {
- TitanCleanup.clear(graphProvider.get());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void setUpTypes() throws Exception {
- TypesDef typesDef = createTypeDefinitions();
- String typesAsJSON = TypesSerialization.toJson(typesDef);
- metadataService.createType(typesAsJSON);
- }
-
- private static final String DATABASE_TYPE = "hive_db";
- private static final String HIVE_TABLE_TYPE = "hive_table";
- private static final String COLUMN_TYPE = "hive_column";
- private static final String HIVE_PROCESS_TYPE = "hive_process";
- private static final String STORAGE_DESC_TYPE = "StorageDesc";
- private static final String VIEW_TYPE = "View";
- private static final String PARTITION_TYPE = "hive_partition";
-
- TypesDef createTypeDefinitions() {
- HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
- .createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
- attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
- attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
-
- HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
- .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
- attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
-
- HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
- .createClassTypeDef(STORAGE_DESC_TYPE, null,
- attrDef("location", DataTypes.STRING_TYPE),
- attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
- attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
-
-
- HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
- .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"),
- attrDef("owner", DataTypes.STRING_TYPE),
- attrDef("createTime", DataTypes.DATE_TYPE),
- attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
- attrDef("temporary", DataTypes.BOOLEAN_TYPE),
- new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
- // todo - uncomment this, something is broken
- new AttributeDefinition("sd", STORAGE_DESC_TYPE,
- Multiplicity.REQUIRED, true, null),
- new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
- Multiplicity.COLLECTION, true, null));
-
- HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
- .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"),
- attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
- attrDef("endTime", DataTypes.LONG_TYPE),
- attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
- attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
- attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
- attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
-
- HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
- .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
- new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
- new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
- Multiplicity.COLLECTION, false, null));
-
- AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
- new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
- Multiplicity.OPTIONAL, false, null),
- new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
- };
- HierarchicalTypeDefinition<ClassType> partClsDef =
- new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null,
- attributeDefinitions);
-
- HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
-
- HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
-
- HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
-
- HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
-
- HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
-
- HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
-
- HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null);
-
- return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
- ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef),
- ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef));
- }
-
- AttributeDefinition attrDef(String name, IDataType dT) {
- return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
- }
-
- AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
- return attrDef(name, dT, m, false, null);
- }
-
- AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
- String reverseAttributeName) {
- Preconditions.checkNotNull(name);
- Preconditions.checkNotNull(dT);
- return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
- }
-
- private void setupInstances() throws Exception {
- Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
-
- Referenceable sd =
- storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(
- column("time_id", "int", "time id")));
-
- List<Referenceable> salesFactColumns = ImmutableList
- .of(column("time_id", "int", "time id"),
- column("product_id", "int", "product id"),
- column("customer_id", "int", "customer id", "PII"),
- column("sales", "double", "product id", "Metric"));
-
- Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
-
- List<Referenceable> logFactColumns = ImmutableList
- .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"),
- column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data"));
-
- List<Referenceable> timeDimColumns = ImmutableList
- .of(column("time_id", "int", "time id"),
- column("dayOfYear", "int", "day Of Year"),
- column("weekDay", "int", "week Day"));
-
- Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
- "Dimension");
-
- Id reportingDB =
- database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
-
- Id salesFactDaily =
- table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
- salesFactColumns, "Metric");
-
- loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
- ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
-
- Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
-
- Id loggingFactDaily =
- table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
- logFactColumns, "Log Data");
-
- List<Referenceable> productDimColumns = ImmutableList
- .of(column("product_id", "int", "product id"),
- column("product_name", "string", "product name"),
- column("brand_name", "int", "brand name"));
-
- Id productDim =
- table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
- "Dimension");
-
- view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
-
- List<Referenceable> customerDimColumns = ImmutableList.of(
- column("customer_id", "int", "customer id", "PII"),
- column("name", "string", "customer name", "PII"),
- column("address", "string", "customer address", "PII"));
-
- Id customerDim =
- table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
- "Dimension");
-
- view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
-
- Id salesFactMonthly =
- table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
- "Managed", salesFactColumns, "Metric");
-
- loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
- ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
-
- Id loggingFactMonthly =
- table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL",
- "Managed", logFactColumns, "Log Data");
-
- loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
- ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
-
- partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
- }
-
- Id database(String name, String description, String owner, String locationUri, String... traitNames)
- throws Exception {
- Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("description", description);
- referenceable.set("owner", owner);
- referenceable.set("locationUri", locationUri);
- referenceable.set("createTime", System.currentTimeMillis());
-
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
- throws Exception {
- Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
- referenceable.set("location", location);
- referenceable.set("inputFormat", inputFormat);
- referenceable.set("outputFormat", outputFormat);
- referenceable.set("compressed", compressed);
- referenceable.set("cols", columns);
-
- return referenceable;
- }
-
- Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("dataType", dataType);
- referenceable.set("comment", comment);
-
- return referenceable;
- }
-
- Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
- List<Referenceable> columns, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("description", description);
- referenceable.set("owner", owner);
- referenceable.set("tableType", tableType);
- referenceable.set("temporary", false);
- referenceable.set("createTime", new Date(System.currentTimeMillis()));
- referenceable.set("lastAccessTime", System.currentTimeMillis());
- referenceable.set("retention", System.currentTimeMillis());
-
- referenceable.set("db", dbId);
- // todo - uncomment this, something is broken
- referenceable.set("sd", sd);
- referenceable.set("columns", columns);
-
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
- String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
- throws Exception {
- Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
- referenceable.set(AtlasClient.NAME, name);
- referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
- referenceable.set("description", description);
- referenceable.set("user", user);
- referenceable.set("startTime", System.currentTimeMillis());
- referenceable.set("endTime", System.currentTimeMillis() + 10000);
-
- referenceable.set("inputs", inputTables);
- referenceable.set("outputs", outputTables);
-
- referenceable.set("queryText", queryText);
- referenceable.set("queryPlan", queryPlan);
- referenceable.set("queryId", queryId);
- referenceable.set("queryGraph", queryGraph);
-
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("db", dbId);
-
- referenceable.set("inputTables", inputTables);
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Id partition(List<String> values, Id table, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
- referenceable.set("values", values);
- referenceable.set("table", table);
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
- return createInstance(referenceable, clsType);
- }
- private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
- ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
- List<String> guids = repository.createEntities(typedInstance);
-
- // return the reference to created instance with guid
- return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
new file mode 100644
index 0000000..d1f9430
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.thinkaurelius.titan.core.TitanGraph;
+import com.thinkaurelius.titan.core.util.TitanCleanup;
+
+import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.services.MetadataService;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.TypesSerialization;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.types.AttributeDefinition;
+import org.apache.atlas.typesystem.types.ClassType;
+import org.apache.atlas.typesystem.types.DataTypes;
+import org.apache.atlas.typesystem.types.EnumTypeDefinition;
+import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
+import org.apache.atlas.typesystem.types.IDataType;
+import org.apache.atlas.typesystem.types.Multiplicity;
+import org.apache.atlas.typesystem.types.StructTypeDefinition;
+import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.testng.annotations.Guice;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Base Class to set up hive types and instances for tests
+ */
+@Guice(modules = RepositoryMetadataModule.class)
+public class BaseRepositoryTest {
+
+ @Inject
+ protected MetadataService metadataService;
+
+ @Inject
+ protected MetadataRepository repository;
+
+ @Inject
+ protected GraphProvider<TitanGraph> graphProvider;
+
+ protected void setUp() throws Exception {
+ setUpTypes();
+ new GraphBackedSearchIndexer(graphProvider);
+ RequestContext.createContext();
+ setupInstances();
+ TestUtils.dumpGraph(graphProvider.get());
+ }
+
+ protected void tearDown() throws Exception {
+ TypeSystem.getInstance().reset();
+ try {
+ graphProvider.get().shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
+ TitanCleanup.clear(graphProvider.get());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void setUpTypes() throws Exception {
+ TypesDef typesDef = createTypeDefinitions();
+ String typesAsJSON = TypesSerialization.toJson(typesDef);
+ metadataService.createType(typesAsJSON);
+ }
+
+ protected static final String DATABASE_TYPE = "hive_db";
+ protected static final String HIVE_TABLE_TYPE = "hive_table";
+ private static final String COLUMN_TYPE = "hive_column";
+ private static final String HIVE_PROCESS_TYPE = "hive_process";
+ private static final String STORAGE_DESC_TYPE = "StorageDesc";
+ private static final String VIEW_TYPE = "View";
+ private static final String PARTITION_TYPE = "hive_partition";
+
+ TypesDef createTypeDefinitions() {
+ HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
+ .createClassTypeDef(DATABASE_TYPE, null,
+ TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
+ attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
+ attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
+
+ HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
+ .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
+ attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
+
+ HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
+ .createClassTypeDef(STORAGE_DESC_TYPE, null,
+ attrDef("location", DataTypes.STRING_TYPE),
+ attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
+ attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
+
+
+ HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
+ .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"),
+ attrDef("owner", DataTypes.STRING_TYPE),
+ attrDef("createTime", DataTypes.DATE_TYPE),
+ attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
+ attrDef("temporary", DataTypes.BOOLEAN_TYPE),
+ new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
+ // todo - uncomment this, something is broken
+ new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
+ new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
+ Multiplicity.COLLECTION, true, null));
+
+ HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
+ .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"),
+ attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
+ attrDef("endTime", DataTypes.LONG_TYPE),
+ attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+ attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+ attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+ attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
+
+ HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
+ .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
+ new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
+ new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
+ Multiplicity.COLLECTION, false, null));
+
+ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+ new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
+ Multiplicity.OPTIONAL, false, null),
+ new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
+ };
+ HierarchicalTypeDefinition<ClassType> partClsDef =
+ new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null,
+ attributeDefinitions);
+
+ HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
+
+ HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
+
+ HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
+
+ HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
+
+ HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
+
+ HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
+
+ HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null);
+
+ return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
+ ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef),
+ ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef));
+ }
+
+ AttributeDefinition attrDef(String name, IDataType dT) {
+ return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
+ }
+
+ AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
+ return attrDef(name, dT, m, false, null);
+ }
+
+ AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
+ String reverseAttributeName) {
+ Preconditions.checkNotNull(name);
+ Preconditions.checkNotNull(dT);
+ return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
+ }
+
+ private void setupInstances() throws Exception {
+ Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
+
+ Referenceable sd =
+ storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(
+ column("time_id", "int", "time id")));
+
+ List<Referenceable> salesFactColumns = ImmutableList
+ .of(column("time_id", "int", "time id"),
+ column("product_id", "int", "product id"),
+ column("customer_id", "int", "customer id", "PII"),
+ column("sales", "double", "product id", "Metric"));
+
+ Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
+
+ List<Referenceable> logFactColumns = ImmutableList
+ .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"),
+ column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data"));
+
+ List<Referenceable> timeDimColumns = ImmutableList
+ .of(column("time_id", "int", "time id"),
+ column("dayOfYear", "int", "day Of Year"),
+ column("weekDay", "int", "week Day"));
+
+ Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
+ "Dimension");
+
+ Id reportingDB =
+ database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
+
+ Id salesFactDaily =
+ table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
+ salesFactColumns, "Metric");
+
+ loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
+ ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
+
+ Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
+
+ Id loggingFactDaily =
+ table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
+ logFactColumns, "Log Data");
+
+ List<Referenceable> productDimColumns = ImmutableList
+ .of(column("product_id", "int", "product id"),
+ column("product_name", "string", "product name"),
+ column("brand_name", "int", "brand name"));
+
+ Id productDim =
+ table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
+ "Dimension");
+
+ view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
+
+ List<Referenceable> customerDimColumns = ImmutableList.of(
+ column("customer_id", "int", "customer id", "PII"),
+ column("name", "string", "customer name", "PII"),
+ column("address", "string", "customer address", "PII"));
+
+ Id customerDim =
+ table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
+ "Dimension");
+
+ view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
+
+ Id salesFactMonthly =
+ table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
+ "Managed", salesFactColumns, "Metric");
+
+ loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
+ ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
+
+ Id loggingFactMonthly =
+ table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL",
+ "Managed", logFactColumns, "Log Data");
+
+ loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
+ ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
+
+ partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
+ }
+
+ Id database(String name, String description, String owner, String locationUri, String... traitNames)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("locationUri", locationUri);
+ referenceable.set("createTime", System.currentTimeMillis());
+
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ protected Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
+ referenceable.set("location", location);
+ referenceable.set("inputFormat", inputFormat);
+ referenceable.set("outputFormat", outputFormat);
+ referenceable.set("compressed", compressed);
+ referenceable.set("cols", columns);
+
+ return referenceable;
+ }
+
+ protected Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("dataType", dataType);
+ referenceable.set("comment", comment);
+
+ return referenceable;
+ }
+
+ protected Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
+ List<Referenceable> columns, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("tableType", tableType);
+ referenceable.set("temporary", false);
+ referenceable.set("createTime", new Date(System.currentTimeMillis()));
+ referenceable.set("lastAccessTime", System.currentTimeMillis());
+ referenceable.set("retention", System.currentTimeMillis());
+
+ referenceable.set("db", dbId);
+ // todo - uncomment this, something is broken
+ referenceable.set("sd", sd);
+ referenceable.set("columns", columns);
+
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ protected Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
+ String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("qualifiedName", name);
+ referenceable.set("description", description);
+ referenceable.set("user", user);
+ referenceable.set("startTime", System.currentTimeMillis());
+ referenceable.set("endTime", System.currentTimeMillis() + 10000);
+
+ referenceable.set("inputs", inputTables);
+ referenceable.set("outputs", outputTables);
+
+ referenceable.set("queryText", queryText);
+ referenceable.set("queryPlan", queryPlan);
+ referenceable.set("queryId", queryId);
+ referenceable.set("queryGraph", queryGraph);
+
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("db", dbId);
+
+ referenceable.set("inputTables", inputTables);
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ Id partition(List<String> values, Id table, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
+ referenceable.set("values", values);
+ referenceable.set("table", table);
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+ private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
+ ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
+ List<String> guids = repository.createEntities(typedInstance);
+
+ // return the reference to created instance with guid
+ return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
+ }
+}