You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/09/20 01:08:01 UTC

[atlas] branch master updated: ATLAS-3413: HMS should create a dummy process linking HDFS path to Hive table for external table lineage

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 15e5bed  ATLAS-3413: HMS should create a dummy process linking HDFS path to Hive table for external table lineage
15e5bed is described below

commit 15e5bedbe470677ab8a8de4ecfa76c55f852c4e7
Author: skoritala <sk...@cloudera.com>
AuthorDate: Thu Sep 19 15:34:41 2019 -0700

    ATLAS-3413: HMS should create a dummy process linking HDFS path to Hive table for external table lineage
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 31 ++++----
 .../apache/atlas/hive/hook/events/CreateTable.java | 82 +++++++++++++++-------
 2 files changed, 74 insertions(+), 39 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 42f78a0..851582c 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -52,15 +52,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import static org.apache.atlas.hive.bridge.HiveMetaStoreBridge.getDatabaseName;
 import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE;
@@ -640,18 +632,29 @@ public abstract class BaseHiveEvent {
     }
 
     protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
-        AtlasEntity ret         = new AtlasEntity(HIVE_TYPE_PROCESS);
-        String      queryStr    = getQueryString();
+        AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
+        String queryStr = getQueryString();
 
         if (queryStr != null) {
             queryStr = queryStr.toLowerCase().trim();
         }
+        ret.setAttribute(ATTRIBUTE_NAME, queryStr);
 
-        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
+        ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
+        String qualifiedName = getQualifiedName(inputs, outputs);
+        if (context.isMetastoreHook()) {
+            HiveOperation operation = context.getHiveOperation();
+            if (operation == HiveOperation.CREATETABLE || operation == HiveOperation.CREATETABLE_AS_SELECT) {
+                AtlasEntity table = outputs.get(0);
+                long createTime = Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
+                qualifiedName =  (String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+                ret.setAttribute(ATTRIBUTE_NAME, "dummyProcess:" + UUID.randomUUID());
+                ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, operation.getOperationName());
+            }
+        }
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
         ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
         ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
-        ret.setAttribute(ATTRIBUTE_NAME, queryStr);
-        ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
 
         // We are setting an empty value to these attributes, since now we have a new entity type called hive process
         // execution which captures these values. We have to set empty values here because these attributes are
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
index aedb155..466167a 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -38,6 +40,7 @@ import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
 import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
 
 public class CreateTable extends BaseHiveEvent {
+    private static final Logger LOG = LoggerFactory.getLogger(CreateTable.class);
     private final boolean skipTempTables;
 
     public CreateTable(AtlasHiveHookContext context, boolean skipTempTables) {
@@ -48,7 +51,7 @@ public class CreateTable extends BaseHiveEvent {
 
     @Override
     public List<HookNotification> getNotificationMessages() throws Exception {
-        List<HookNotification>   ret      = null;
+        List<HookNotification> ret = null;
         AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
 
         if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
@@ -117,41 +120,70 @@ public class CreateTable extends BaseHiveEvent {
         if (table != null) {
             AtlasEntity tblEntity = toTableEntity(table, ret);
 
-            if (tblEntity != null && !context.isMetastoreHook()) {
+            if (tblEntity != null) {
                 if (isHBaseStore(table)) {
-                    // This create lineage to HBase table in case of Hive on HBase
-                    AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
-
-                    if (hbaseTableEntity != null) {
-                        final AtlasEntity processEntity;
+                    if (context.isMetastoreHook()) {
+                        //do nothing
+                    } else {
+                        // This create lineage to HBase table in case of Hive on HBase
+                        AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
+
+                        //not a hive metastore hook
+                        //it is running in the context of Hbase.
+                        if (hbaseTableEntity != null) {
+                            final AtlasEntity processEntity;
+
+                            if (EXTERNAL_TABLE.equals(table.getTableType())) {
+                                processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
+                            } else {
+                                processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
+                            }
+                            ret.addEntity(processEntity);
 
-                        if (EXTERNAL_TABLE.equals(table.getTableType())) {
-                            processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
-                        } else {
-                            processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
+                            AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
+                            ret.addEntity(processExecution);
                         }
-                        ret.addEntity(processEntity);
-
-                        AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
-                        ret.addEntity(processExecution);
                     }
+
                 } else {
-                    if (EXTERNAL_TABLE.equals(table.getTableType())) {
-                        AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
-                        AtlasEntity processEntity  = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
+                    if (context.isMetastoreHook()) {
+                        //it is running in the context of HiveMetastore
+                        //not a hive metastore hook
+                        if (EXTERNAL_TABLE.equals(table.getTableType())) {
+                            AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("Creating a dummy process with lineage from hdfs path to table");
+                            }
+                            AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity),
+                                    Collections.singletonList(tblEntity));
+
+                            ret.addEntity(processEntity);
+                            ret.addReferredEntity(hdfsPathEntity);
+                            //hive process entity will be created by hiveserver hook.
+                        }
+                    } else {
+                        //not a hive metastore hook
+                        //it is running in the context of HiveServer2
+                        if (EXTERNAL_TABLE.equals(table.getTableType())) {
+                            AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
+                            AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
 
-                        ret.addEntity(processEntity);
-                        ret.addReferredEntity(hdfsPathEntity);
+                            ret.addEntity(processEntity);
+                            ret.addReferredEntity(hdfsPathEntity);
 
-                        AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
-                        ret.addEntity(processExecution);
+                            AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
+                            ret.addEntity(processExecution);
+                        }
                     }
+
                 }
 
-                AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity);
+                if (!context.isMetastoreHook()) {
+                    AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity);
 
-                if (tableDDLEntity != null) {
-                    ret.addEntity(tableDDLEntity);
+                    if (tableDDLEntity != null) {
+                        ret.addEntity(tableDDLEntity);
+                    }
                 }
             }
         }