You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/09/20 01:08:01 UTC
[atlas] branch master updated: ATLAS-3413: HMS should create a
dummy process linking HDFS path to Hive table for external table lineage
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 15e5bed ATLAS-3413: HMS should create a dummy process linking HDFS path to Hive table for external table lineage
15e5bed is described below
commit 15e5bedbe470677ab8a8de4ecfa76c55f852c4e7
Author: skoritala <sk...@cloudera.com>
AuthorDate: Thu Sep 19 15:34:41 2019 -0700
ATLAS-3413: HMS should create a dummy process linking HDFS path to Hive table for external table lineage
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../atlas/hive/hook/events/BaseHiveEvent.java | 31 ++++----
.../apache/atlas/hive/hook/events/CreateTable.java | 82 +++++++++++++++-------
2 files changed, 74 insertions(+), 39 deletions(-)
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 42f78a0..851582c 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -52,15 +52,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import static org.apache.atlas.hive.bridge.HiveMetaStoreBridge.getDatabaseName;
import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE;
@@ -640,18 +632,29 @@ public abstract class BaseHiveEvent {
}
protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
- AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
- String queryStr = getQueryString();
+ AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
+ String queryStr = getQueryString();
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
+ ret.setAttribute(ATTRIBUTE_NAME, queryStr);
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
+ ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
+ String qualifiedName = getQualifiedName(inputs, outputs);
+ if (context.isMetastoreHook()) {
+ HiveOperation operation = context.getHiveOperation();
+ if (operation == HiveOperation.CREATETABLE || operation == HiveOperation.CREATETABLE_AS_SELECT) {
+ AtlasEntity table = outputs.get(0);
+ long createTime = Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
+ qualifiedName = (String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+ ret.setAttribute(ATTRIBUTE_NAME, "dummyProcess:" + UUID.randomUUID());
+ ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, operation.getOperationName());
+ }
+ }
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
- ret.setAttribute(ATTRIBUTE_NAME, queryStr);
- ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
// We are setting an empty value to these attributes, since now we have a new entity type called hive process
// execution which captures these values. We have to set empty values here because these attributes are
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
index aedb155..466167a 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
@@ -38,6 +40,7 @@ import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
public class CreateTable extends BaseHiveEvent {
+ private static final Logger LOG = LoggerFactory.getLogger(CreateTable.class);
private final boolean skipTempTables;
public CreateTable(AtlasHiveHookContext context, boolean skipTempTables) {
@@ -48,7 +51,7 @@ public class CreateTable extends BaseHiveEvent {
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
- List<HookNotification> ret = null;
+ List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
@@ -117,41 +120,70 @@ public class CreateTable extends BaseHiveEvent {
if (table != null) {
AtlasEntity tblEntity = toTableEntity(table, ret);
- if (tblEntity != null && !context.isMetastoreHook()) {
+ if (tblEntity != null) {
if (isHBaseStore(table)) {
- // This create lineage to HBase table in case of Hive on HBase
- AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
-
- if (hbaseTableEntity != null) {
- final AtlasEntity processEntity;
+ if (context.isMetastoreHook()) {
+ //do nothing
+ } else {
+ // This create lineage to HBase table in case of Hive on HBase
+ AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
+
+ //not a hive metastore hook
+ //it is running in the context of Hbase.
+ if (hbaseTableEntity != null) {
+ final AtlasEntity processEntity;
+
+ if (EXTERNAL_TABLE.equals(table.getTableType())) {
+ processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
+ } else {
+ processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
+ }
+ ret.addEntity(processEntity);
- if (EXTERNAL_TABLE.equals(table.getTableType())) {
- processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
- } else {
- processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
+ AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
+ ret.addEntity(processExecution);
}
- ret.addEntity(processEntity);
-
- AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
- ret.addEntity(processExecution);
}
+
} else {
- if (EXTERNAL_TABLE.equals(table.getTableType())) {
- AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
- AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
+ if (context.isMetastoreHook()) {
+ //it is running in the context of HiveMetastore
+ //not a hive metastore hook
+ if (EXTERNAL_TABLE.equals(table.getTableType())) {
+ AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Creating a dummy process with lineage from hdfs path to table");
+ }
+ AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity),
+ Collections.singletonList(tblEntity));
+
+ ret.addEntity(processEntity);
+ ret.addReferredEntity(hdfsPathEntity);
+ //hive process entity will be created by hiveserver hook.
+ }
+ } else {
+ //not a hive metastore hook
+ //it is running in the context of HiveServer2
+ if (EXTERNAL_TABLE.equals(table.getTableType())) {
+ AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
+ AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
- ret.addEntity(processEntity);
- ret.addReferredEntity(hdfsPathEntity);
+ ret.addEntity(processEntity);
+ ret.addReferredEntity(hdfsPathEntity);
- AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
- ret.addEntity(processExecution);
+ AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
+ ret.addEntity(processExecution);
+ }
}
+
}
- AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity);
+ if (!context.isMetastoreHook()) {
+ AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity);
- if (tableDDLEntity != null) {
- ret.addEntity(tableDDLEntity);
+ if (tableDDLEntity != null) {
+ ret.addEntity(tableDDLEntity);
+ }
}
}
}