You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/05/31 18:24:55 UTC

[atlas] branch master updated: ATLAS-3229 DDL should not capture DML queries

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new be9df1c  ATLAS-3229 DDL should not capture DML queries
be9df1c is described below

commit be9df1c3f8895bdcc9949c97ab032b0051edd612
Author: Le Ma <lm...@cloudera.com>
AuthorDate: Fri May 24 13:23:37 2019 -0700

    ATLAS-3229 DDL should not capture DML queries
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../atlas/hive/hook/events/CreateHiveProcess.java  | 10 ++-
 .../org/apache/atlas/hive/hook/HiveHookIT.java     | 97 ++++++++++++++++++++--
 2 files changed, 98 insertions(+), 9 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 38b53b6..c4b3ac7 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -109,7 +109,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
                         outputs.add(entity);
                     }
 
-                    if (entity != null && !context.isMetastoreHook()) {
+                    if (isDdlOperation(entity)) {
+
                         AtlasEntity ddlEntity = createHiveDDLEntity(entity);
 
                         if (ddlEntity != null) {
@@ -277,4 +278,11 @@ public class CreateHiveProcess extends BaseHiveEvent {
 
         return ret;
     }
+
+    private boolean isDdlOperation(AtlasEntity entity) {
+        return entity != null && !context.isMetastoreHook()
+            && (context.getHiveOperation().equals(HiveOperation.CREATETABLE_AS_SELECT)
+             || context.getHiveOperation().equals(HiveOperation.CREATEVIEW)
+             || context.getHiveOperation().equals(HiveOperation.ALTERVIEW_AS));
+    }
 }
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index d19470f..c830162 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -534,8 +534,16 @@ public class HiveHookIT extends HiveITBase {
         String loadFile  = file("load");
         String query     = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
 
+        String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
         runCommand(query);
 
+        AtlasEntity tblEntity  = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries        = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 1);
+
         assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
     }
 
@@ -545,8 +553,16 @@ public class HiveHookIT extends HiveITBase {
         String loadFile  = file("load");
         String query     = "load data local inpath 'file://" + loadFile + "' into table " + tableName +  " partition(dt = '"+ PART_FILE + "')";
 
+        String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
         runCommand(query);
 
+        AtlasEntity tblEntity  = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries        = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 1);
+
         assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
     }
 
@@ -643,7 +659,12 @@ public class HiveHookIT extends HiveITBase {
             addAll(inputs);
         }};
 
-        assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        String tblId           = assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        AtlasEntity tblEntity  = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries        = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 1);
 
         AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs);
 
@@ -961,7 +982,13 @@ public class HiveHookIT extends HiveITBase {
         Assert.assertEquals(process.getGuid(), hiveProcess.getGuid());
         Assert.assertEquals(numberOfProcessExecutions(hiveProcess), 1);
         assertTableIsRegistered(DEFAULT_DB, tableName);
-        assertTableIsRegistered(DEFAULT_DB, insertTableName);
+
+        String tblId          = assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries       = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 1);
 
         //TODO -Add update test case
     }
@@ -970,13 +997,19 @@ public class HiveHookIT extends HiveITBase {
     public void testExportImportUnPartitionedTable() throws Exception {
         String tableName = createTable(false);
 
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
         String filename = "pfile://" + mkdir("exportUnPartitioned");
         String query    = "export table " + tableName + " to \"" + filename + "\"";
 
         runCommand(query);
 
+        AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries       = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 1);
+
         Set<ReadEntity>  inputs        = getInputs(tableName, Entity.Type.TABLE);
         Set<WriteEntity> outputs       = getOutputs(filename, Entity.Type.DFS_DIR);
 
@@ -993,12 +1026,18 @@ public class HiveHookIT extends HiveITBase {
         //Import
         String importTableName = createTable(false);
 
-        assertTableIsRegistered(DEFAULT_DB, importTableName);
+        String importTblId = assertTableIsRegistered(DEFAULT_DB, importTableName);
 
         query = "import table " + importTableName + " from '" + filename + "'";
 
         runCommand(query);
 
+        AtlasEntity importTblEntity = atlasClientV2.getEntityByGuid(importTblId).getEntity();
+        List importTblddlQueries    = (List) importTblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(importTblddlQueries);
+        Assert.assertEquals(importTblddlQueries.size(), 1);
+
         outputs = getOutputs(importTableName, Entity.Type.TABLE);
 
         HiveEventContext event2         = constructEvent(query, HiveOperation.IMPORT,
@@ -1018,6 +1057,12 @@ public class HiveHookIT extends HiveITBase {
 
         runCommand(query);
 
+        AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries2       = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries2);
+        Assert.assertEquals(ddlQueries2.size(), 1);
+
         inputs  = getInputs(tableName, Entity.Type.TABLE);
         outputs = getOutputs(filename, Entity.Type.DFS_DIR);
 
@@ -1039,6 +1084,12 @@ public class HiveHookIT extends HiveITBase {
 
         runCommand(query);
 
+        AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(importTblId).getEntity();
+        List ddlQueries3       = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries3);
+        Assert.assertEquals(ddlQueries3.size(), 1);
+
         outputs = getOutputs(importTableName, Entity.Type.TABLE);
 
         HiveEventContext event4 = constructEvent(query, HiveOperation.IMPORT, getInputs(filename,
@@ -1062,7 +1113,7 @@ public class HiveHookIT extends HiveITBase {
         boolean isPartitionedTable = true;
         String  tableName          = createTable(isPartitionedTable);
 
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
         //Add a partition
         String partFile = "pfile://" + mkdir("partition");
@@ -1070,12 +1121,24 @@ public class HiveHookIT extends HiveITBase {
 
         runCommand(query);
 
+        AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries       = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 1);
+
         String filename = "pfile://" + mkdir("export");
 
         query = "export table " + tableName + " to \"" + filename + "\"";
 
         runCommand(query);
 
+        AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List ddlQueries2       = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries2);
+        Assert.assertEquals(ddlQueries2.size(), 1);
+
         Set<ReadEntity>  expectedExportInputs = getInputs(tableName, Entity.Type.TABLE);
         Set<WriteEntity> outputs              = getOutputs(filename, Entity.Type.DFS_DIR);
         Set<ReadEntity> partitionIps          = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); //Note that export has only partition as input in this case
@@ -1095,12 +1158,18 @@ public class HiveHookIT extends HiveITBase {
         //Import
         String importTableName = createTable(true);
 
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        String tblId2 = assertTableIsRegistered(DEFAULT_DB, tableName);
 
         query = "import table " + importTableName + " from '" + filename + "'";
 
         runCommand(query);
 
+        AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(tblId2).getEntity();
+        List ddlQueries3       = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries3);
+        Assert.assertEquals(ddlQueries3.size(), 1);
+
         Set<ReadEntity>  expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR);
         Set<WriteEntity> importOutputs        = getOutputs(importTableName, Entity.Type.TABLE);
         Set<WriteEntity> partitionOps         = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
@@ -1715,8 +1784,14 @@ public class HiveHookIT extends HiveITBase {
 
         runCommand(query);
 
-        assertTableIsRegistered(DEFAULT_DB, newName);
         assertTableIsNotRegistered(DEFAULT_DB, viewName);
+
+        String viewId          = assertTableIsRegistered(DEFAULT_DB, newName);
+        AtlasEntity viewEntity = atlasClientV2.getEntityByGuid(viewId).getEntity();
+        List ddlQueries        = (List) viewEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 2);
     }
 
     @Test
@@ -1728,7 +1803,7 @@ public class HiveHookIT extends HiveITBase {
 
         runCommandWithDelay(query, 5000);
 
-        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+        String tblId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
             @Override
             public void assertOnEntity(AtlasEntity tableRef) throws Exception {
                 AtlasObjectId sd = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC));
@@ -1737,6 +1812,12 @@ public class HiveHookIT extends HiveITBase {
             }
         });
 
+        AtlasEntity tblEntity           = atlasClientV2.getEntityByGuid(tblId).getEntity();
+        List        ddlQueries          = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        Assert.assertNotNull(ddlQueries);
+        Assert.assertEquals(ddlQueries.size(), 2);
+
         String      processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName);
         String      processId            = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQualifiedName, null);
         AtlasEntity processEntity        = atlasClientV2.getEntityByGuid(processId).getEntity();