You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/09/22 02:10:34 UTC
[6/7] atlas git commit: ATLAS-2157: HiveHook fix to handle getTable()
error for temproray tables
ATLAS-2157: HiveHook fix to handle getTable() error for temproray tables
(cherry picked from commit baccd1d8d768cc9e4650c3b39383c304e936e503)
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/43ab507b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/43ab507b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/43ab507b
Branch: refs/heads/branch-0.8
Commit: 43ab507b35a1d4c4ee49a7c40ddf1e08a44130ab
Parents: d0343f1
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Wed Sep 20 16:12:23 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Sep 21 19:10:04 2017 -0700
----------------------------------------------------------------------
.../org/apache/atlas/hive/hook/HiveHook.java | 93 +++++++++++++-------
1 file changed, 63 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/43ab507b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 7dc2e2f..aca5645 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -526,12 +526,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Database db = null;
Table table = null;
Partition partition = null;
- LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
- List<Referenceable> entities = new ArrayList<>();
switch (entity.getType()) {
case DATABASE:
db = entity.getDatabase();
+
+ if (db != null) {
+ db = dgiBridge.hiveClient.getDatabase(db.getName());
+ }
break;
case TABLE:
@@ -549,40 +551,47 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.info("{}: entity-type not handled by Atlas hook. Ignored", entity.getType());
}
- if (db != null) {
- db = dgiBridge.hiveClient.getDatabase(db.getName());
- }
+ Referenceable dbEntity = null;
+ Referenceable tableEntity = null;
if (db != null) {
- Referenceable dbEntity = dgiBridge.createDBInstance(db);
-
- entities.add(dbEntity);
- result.put(Type.DATABASE, dbEntity);
+ dbEntity = dgiBridge.createDBInstance(db);
+ }
- Referenceable tableEntity = null;
+ if (db != null && table != null) {
+ if (existTable != null) {
+ table = existTable;
+ } else {
+ table = refreshTable(dgiBridge, table.getDbName(), table.getTableName());
+ }
if (table != null) {
- if (existTable != null) {
- table = existTable;
- } else {
- table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
- }
- //If its an external table, even though the temp table skip flag is on,
- // we create the table since we need the HDFS path to temp table lineage.
- if (skipTempTables &&
- table.isTemporary() &&
- !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
-
+ // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage.
+ if (skipTempTables && table.isTemporary() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ LOG.warn("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
} else {
tableEntity = dgiBridge.createTableInstance(dbEntity, table);
- entities.add(tableEntity);
- result.put(Type.TABLE, tableEntity);
}
}
+ }
+
+ LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
+ List<Referenceable> entities = new ArrayList<>();
+
+ if (dbEntity != null) {
+ result.put(Type.DATABASE, dbEntity);
+ entities.add(dbEntity);
+ }
+ if (tableEntity != null) {
+ result.put(Type.TABLE, tableEntity);
+ entities.add(tableEntity);
+ }
+
+ if (!entities.isEmpty()) {
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
}
+
return result;
}
catch(Exception e) {
@@ -709,7 +718,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
final String tblQFName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
if (!dataSetsProcessed.contains(tblQFName)) {
LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false);
- dataSets.put(entity, result.get(Type.TABLE));
+
+ if (result.get(Type.TABLE) != null) {
+ dataSets.put(entity, result.get(Type.TABLE));
+ }
+
dataSetsProcessed.add(tblQFName);
entities.addAll(result.values());
}
@@ -760,7 +773,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Refresh to get the correct location
if(hiveTable != null) {
- hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
+ hiveTable = refreshTable(dgiBridge, hiveTable.getDbName(), hiveTable.getTableName());
}
if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
@@ -951,12 +964,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName());
} else if (refs.containsKey(input)) {
if ( input.getType() == Type.PARTITION || input.getType() == Type.TABLE) {
- final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(input.getTable().getDbName(), input.getTable().getTableName()));
- addDataset(buffer, refs.get(input), createTime.getTime());
+ Table inputTable = refreshTable(hiveBridge, input.getTable().getDbName(), input.getTable().getTableName());
+
+ if (inputTable != null) {
+ final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(inputTable);
+ addDataset(buffer, refs.get(input), createTime.getTime());
+ }
} else {
addDataset(buffer, refs.get(input));
}
}
+
dataSetsProcessed.add(input.getName().toLowerCase());
}
}
@@ -995,12 +1013,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName());
} else if (refs.containsKey(output)) {
if ( output.getType() == Type.PARTITION || output.getType() == Type.TABLE) {
- final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(output.getTable().getDbName(), output.getTable().getTableName()));
- addDataset(buffer, refs.get(output), createTime.getTime());
+ Table outputTable = refreshTable(hiveBridge, output.getTable().getDbName(), output.getTable().getTableName());
+
+ if (outputTable != null) {
+ final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(outputTable);
+ addDataset(buffer, refs.get(output), createTime.getTime());
+ }
} else {
addDataset(buffer, refs.get(output));
}
}
+
dataSetsProcessed.add(output.getName().toLowerCase());
}
}
@@ -1008,6 +1031,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
}
+ private static Table refreshTable(HiveMetaStoreBridge dgiBridge, String dbName, String tableName) {
+ try {
+ return dgiBridge.hiveClient.getTable(dbName, tableName);
+ } catch (HiveException excp) { // this might be the case for temp tables
+ LOG.warn("failed to get details for table {}.{}. Ignoring. {}: {}", dbName, tableName, excp.getClass().getCanonicalName(), excp.getMessage());
+ }
+
+ return null;
+ }
+
private static boolean addQueryType(HiveOperation op, WriteEntity entity) {
if (entity.getWriteType() != null && HiveOperation.QUERY.equals(op)) {
switch (entity.getWriteType()) {