You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/05/31 18:24:55 UTC
[atlas] branch master updated: ATLAS-3229 DDL should not capture
DML queries
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new be9df1c ATLAS-3229 DDL should not capture DML queries
be9df1c is described below
commit be9df1c3f8895bdcc9949c97ab032b0051edd612
Author: Le Ma <lm...@cloudera.com>
AuthorDate: Fri May 24 13:23:37 2019 -0700
ATLAS-3229 DDL should not capture DML queries
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../atlas/hive/hook/events/CreateHiveProcess.java | 10 ++-
.../org/apache/atlas/hive/hook/HiveHookIT.java | 97 ++++++++++++++++++++--
2 files changed, 98 insertions(+), 9 deletions(-)
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 38b53b6..c4b3ac7 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -109,7 +109,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
outputs.add(entity);
}
- if (entity != null && !context.isMetastoreHook()) {
+ if (isDdlOperation(entity)) {
+
AtlasEntity ddlEntity = createHiveDDLEntity(entity);
if (ddlEntity != null) {
@@ -277,4 +278,11 @@ public class CreateHiveProcess extends BaseHiveEvent {
return ret;
}
+
+ private boolean isDdlOperation(AtlasEntity entity) {
+ return entity != null && !context.isMetastoreHook()
+ && (context.getHiveOperation().equals(HiveOperation.CREATETABLE_AS_SELECT)
+ || context.getHiveOperation().equals(HiveOperation.CREATEVIEW)
+ || context.getHiveOperation().equals(HiveOperation.ALTERVIEW_AS));
+ }
}
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index d19470f..c830162 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -534,8 +534,16 @@ public class HiveHookIT extends HiveITBase {
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
+ String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
runCommand(query);
+ AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 1);
+
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@@ -545,8 +553,16 @@ public class HiveHookIT extends HiveITBase {
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
+ String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
runCommand(query);
+ AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 1);
+
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@@ -643,7 +659,12 @@ public class HiveHookIT extends HiveITBase {
addAll(inputs);
}};
- assertTableIsRegistered(DEFAULT_DB, insertTableName);
+ String tblId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
+ AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 1);
AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs);
@@ -961,7 +982,13 @@ public class HiveHookIT extends HiveITBase {
Assert.assertEquals(process.getGuid(), hiveProcess.getGuid());
Assert.assertEquals(numberOfProcessExecutions(hiveProcess), 1);
assertTableIsRegistered(DEFAULT_DB, tableName);
- assertTableIsRegistered(DEFAULT_DB, insertTableName);
+
+ String tblId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
+ AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 1);
//TODO -Add update test case
}
@@ -970,13 +997,19 @@ public class HiveHookIT extends HiveITBase {
public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false);
- assertTableIsRegistered(DEFAULT_DB, tableName);
+ String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
String filename = "pfile://" + mkdir("exportUnPartitioned");
String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
+ AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 1);
+
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
@@ -993,12 +1026,18 @@ public class HiveHookIT extends HiveITBase {
//Import
String importTableName = createTable(false);
- assertTableIsRegistered(DEFAULT_DB, importTableName);
+ String importTblId = assertTableIsRegistered(DEFAULT_DB, importTableName);
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
+ AtlasEntity importTblEntity = atlasClientV2.getEntityByGuid(importTblId).getEntity();
+ List importTblddlQueries = (List) importTblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(importTblddlQueries);
+ Assert.assertEquals(importTblddlQueries.size(), 1);
+
outputs = getOutputs(importTableName, Entity.Type.TABLE);
HiveEventContext event2 = constructEvent(query, HiveOperation.IMPORT,
@@ -1018,6 +1057,12 @@ public class HiveHookIT extends HiveITBase {
runCommand(query);
+ AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries2);
+ Assert.assertEquals(ddlQueries2.size(), 1);
+
inputs = getInputs(tableName, Entity.Type.TABLE);
outputs = getOutputs(filename, Entity.Type.DFS_DIR);
@@ -1039,6 +1084,12 @@ public class HiveHookIT extends HiveITBase {
runCommand(query);
+ AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(importTblId).getEntity();
+ List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries3);
+ Assert.assertEquals(ddlQueries3.size(), 1);
+
outputs = getOutputs(importTableName, Entity.Type.TABLE);
HiveEventContext event4 = constructEvent(query, HiveOperation.IMPORT, getInputs(filename,
@@ -1062,7 +1113,7 @@ public class HiveHookIT extends HiveITBase {
boolean isPartitionedTable = true;
String tableName = createTable(isPartitionedTable);
- assertTableIsRegistered(DEFAULT_DB, tableName);
+ String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
//Add a partition
String partFile = "pfile://" + mkdir("partition");
@@ -1070,12 +1121,24 @@ public class HiveHookIT extends HiveITBase {
runCommand(query);
+ AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 1);
+
String filename = "pfile://" + mkdir("export");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
+ AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries2);
+ Assert.assertEquals(ddlQueries2.size(), 1);
+
Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); //Note that export has only partition as input in this case
@@ -1095,12 +1158,18 @@ public class HiveHookIT extends HiveITBase {
//Import
String importTableName = createTable(true);
- assertTableIsRegistered(DEFAULT_DB, tableName);
+ String tblId2 = assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
+ AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(tblId2).getEntity();
+ List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries3);
+ Assert.assertEquals(ddlQueries3.size(), 1);
+
Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR);
Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE);
Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
@@ -1715,8 +1784,14 @@ public class HiveHookIT extends HiveITBase {
runCommand(query);
- assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, viewName);
+
+ String viewId = assertTableIsRegistered(DEFAULT_DB, newName);
+ AtlasEntity viewEntity = atlasClientV2.getEntityByGuid(viewId).getEntity();
+ List ddlQueries = (List) viewEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 2);
}
@Test
@@ -1728,7 +1803,7 @@ public class HiveHookIT extends HiveITBase {
runCommandWithDelay(query, 5000);
- assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+ String tblId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity tableRef) throws Exception {
AtlasObjectId sd = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC));
@@ -1737,6 +1812,12 @@ public class HiveHookIT extends HiveITBase {
}
});
+ AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+ List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ Assert.assertNotNull(ddlQueries);
+ Assert.assertEquals(ddlQueries.size(), 2);
+
String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQualifiedName, null);
AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity();