You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/09/21 20:38:32 UTC

atlas git commit: ATLAS-2157: HiveHook fix to handle getTable() error for temproray tables

Repository: atlas
Updated Branches:
  refs/heads/master ae576650c -> baccd1d8d


ATLAS-2157: HiveHook fix to handle getTable() error for temproray tables


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/baccd1d8
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/baccd1d8
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/baccd1d8

Branch: refs/heads/master
Commit: baccd1d8d768cc9e4650c3b39383c304e936e503
Parents: ae57665
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Wed Sep 20 16:12:23 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Sep 21 13:38:22 2017 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/hive/hook/HiveHook.java    | 93 +++++++++++++-------
 1 file changed, 63 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/baccd1d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 7dc2e2f..aca5645 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -526,12 +526,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             Database db = null;
             Table table = null;
             Partition partition = null;
-            LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
-            List<Referenceable> entities = new ArrayList<>();
 
             switch (entity.getType()) {
                 case DATABASE:
                     db = entity.getDatabase();
+
+                    if (db != null) {
+                        db = dgiBridge.hiveClient.getDatabase(db.getName());
+                    }
                     break;
 
                 case TABLE:
@@ -549,40 +551,47 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                     LOG.info("{}: entity-type not handled by Atlas hook. Ignored", entity.getType());
             }
 
-            if (db != null) {
-                db = dgiBridge.hiveClient.getDatabase(db.getName());
-            }
+            Referenceable dbEntity    = null;
+            Referenceable tableEntity = null;
 
             if (db != null) {
-                Referenceable dbEntity = dgiBridge.createDBInstance(db);
-
-                entities.add(dbEntity);
-                result.put(Type.DATABASE, dbEntity);
+                dbEntity = dgiBridge.createDBInstance(db);
+            }
 
-                Referenceable tableEntity = null;
+            if (db != null && table != null) {
+                if (existTable != null) {
+                    table = existTable;
+                } else {
+                    table = refreshTable(dgiBridge, table.getDbName(), table.getTableName());
+                }
 
                 if (table != null) {
-                    if (existTable != null) {
-                        table = existTable;
-                    } else {
-                        table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
-                    }
-                    //If its an external table, even though the temp table skip flag is on,
-                    // we create the table since we need the HDFS path to temp table lineage.
-                    if (skipTempTables &&
-                            table.isTemporary() &&
-                            !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-                        LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
-
+                    // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage.
+                    if (skipTempTables && table.isTemporary() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+                        LOG.warn("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
                     } else {
                         tableEntity = dgiBridge.createTableInstance(dbEntity, table);
-                        entities.add(tableEntity);
-                        result.put(Type.TABLE, tableEntity);
                     }
                 }
+            }
+
+            LinkedHashMap<Type, Referenceable> result   = new LinkedHashMap<>();
+            List<Referenceable>                entities = new ArrayList<>();
+
+            if (dbEntity != null) {
+                result.put(Type.DATABASE, dbEntity);
+                entities.add(dbEntity);
+            }
 
+            if (tableEntity != null) {
+                result.put(Type.TABLE, tableEntity);
+                entities.add(tableEntity);
+            }
+
+            if (!entities.isEmpty()) {
                 event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
             }
+
             return result;
         }
         catch(Exception e) {
@@ -709,7 +718,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                 final String tblQFName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
                 if (!dataSetsProcessed.contains(tblQFName)) {
                     LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false);
-                    dataSets.put(entity, result.get(Type.TABLE));
+
+                    if (result.get(Type.TABLE) != null) {
+                        dataSets.put(entity, result.get(Type.TABLE));
+                    }
+
                     dataSetsProcessed.add(tblQFName);
                     entities.addAll(result.values());
                 }
@@ -760,7 +773,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
         //Refresh to get the correct location
         if(hiveTable != null) {
-            hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
+            hiveTable = refreshTable(dgiBridge, hiveTable.getDbName(), hiveTable.getTableName());
         }
 
         if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
@@ -951,12 +964,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                             LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName());
                         } else if (refs.containsKey(input)) {
                             if ( input.getType() == Type.PARTITION || input.getType() == Type.TABLE) {
-                                final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(input.getTable().getDbName(), input.getTable().getTableName()));
-                                addDataset(buffer, refs.get(input), createTime.getTime());
+                                Table inputTable = refreshTable(hiveBridge, input.getTable().getDbName(), input.getTable().getTableName());
+
+                                if (inputTable != null) {
+                                    final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(inputTable);
+                                    addDataset(buffer, refs.get(input), createTime.getTime());
+                                }
                             } else {
                                 addDataset(buffer, refs.get(input));
                             }
                         }
+
                         dataSetsProcessed.add(input.getName().toLowerCase());
                     }
                 }
@@ -995,12 +1013,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                             LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName());
                         } else if (refs.containsKey(output)) {
                             if ( output.getType() == Type.PARTITION || output.getType() == Type.TABLE) {
-                                final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(output.getTable().getDbName(), output.getTable().getTableName()));
-                                addDataset(buffer, refs.get(output), createTime.getTime());
+                                Table outputTable = refreshTable(hiveBridge, output.getTable().getDbName(), output.getTable().getTableName());
+
+                                if (outputTable != null) {
+                                    final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(outputTable);
+                                    addDataset(buffer, refs.get(output), createTime.getTime());
+                                }
                             } else {
                                 addDataset(buffer, refs.get(output));
                             }
                         }
+
                         dataSetsProcessed.add(output.getName().toLowerCase());
                     }
                 }
@@ -1008,6 +1031,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
     }
 
+    private static Table refreshTable(HiveMetaStoreBridge dgiBridge, String dbName, String tableName) {
+        try {
+            return dgiBridge.hiveClient.getTable(dbName, tableName);
+        } catch (HiveException excp) { // this might be the case for temp tables
+            LOG.warn("failed to get details for table {}.{}. Ignoring. {}: {}", dbName, tableName, excp.getClass().getCanonicalName(), excp.getMessage());
+        }
+
+        return null;
+    }
+
     private static boolean addQueryType(HiveOperation op, WriteEntity entity) {
         if (entity.getWriteType() != null && HiveOperation.QUERY.equals(op)) {
             switch (entity.getWriteType()) {