You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/03/09 07:08:23 UTC

[atlas] 02/02: ATLAS-3659: updated Hive hook to create aws_s3_v2 entities

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit f01f49800384aff9d64cdc743c962afb92651fd9
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Sun Mar 8 18:09:19 2020 -0700

    ATLAS-3659: updated Hive hook to create aws_s3_v2 entities
    
    (cherry picked from commit 30a275d4704ec82a3a860fd239274972ecec41af)
---
 .../atlas/hive/hook/AtlasHiveHookContext.java      |   4 +
 .../java/org/apache/atlas/hive/hook/HiveHook.java  |   6 +
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 142 +++++++++++++++++----
 3 files changed, 125 insertions(+), 27 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index 78290c0..d0b9393 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -168,6 +168,10 @@ public class AtlasHiveHookContext {
         return hook.isConvertHdfsPathToLowerCase();
     }
 
+    public boolean isAwsS3AtlasModelVersionV2() {
+        return hook.isAwsS3AtlasModelVersionV2();
+    }
+
     public boolean getSkipHiveColumnLineageHive20633() {
         return hook.getSkipHiveColumnLineageHive20633();
     }
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index dd425cc..3aa5c3b 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -60,6 +60,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     public static final String HOOK_NAME_CACHE_DATABASE_COUNT      = CONF_PREFIX + "name.cache.database.count";
     public static final String HOOK_NAME_CACHE_TABLE_COUNT         = CONF_PREFIX + "name.cache.table.count";
     public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds";
+    public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION     = CONF_PREFIX + "aws_s3.atlas.model.version";
+    public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2  = "v2";
     public static final String HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES          = CONF_PREFIX + "hive_process.populate.deprecated.attributes";
     public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633                  = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
     public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold";
@@ -75,6 +77,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final int     nameCacheDatabaseMaxCount;
     private static final int     nameCacheTableMaxCount;
     private static final int     nameCacheRebuildIntervalSeconds;
+    private static final boolean isAwsS3AtlasModelVersionV2;
 
     private static final boolean                       skipHiveColumnLineageHive20633;
     private static final int                           skipHiveColumnLineageHive20633InputsThreshold;
@@ -98,6 +101,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         nameCacheDatabaseMaxCount       = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
         nameCacheTableMaxCount          = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
         nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default
+        isAwsS3AtlasModelVersionV2      = StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
         skipHiveColumnLineageHive20633                = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
         skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
         hiveProcessPopulateDeprecatedAttributes       = atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES, false);
@@ -253,6 +257,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return convertHdfsPathToLowerCase;
     }
 
+    public boolean isAwsS3AtlasModelVersionV2() { return isAwsS3AtlasModelVersionV2; }
+
     public boolean getSkipHiveColumnLineageHive20633() {
         return skipHiveColumnLineageHive20633;
     }
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 86bf7a0..b766742 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -82,6 +82,8 @@ public abstract class BaseHiveEvent {
     public static final String AWS_S3_BUCKET                       = "aws_s3_bucket";
     public static final String AWS_S3_PSEUDO_DIR                   = "aws_s3_pseudo_dir";
     public static final String AWS_S3_OBJECT                       = "aws_s3_object";
+    public static final String AWS_S3_V2_BUCKET                    = "aws_s3_v2_bucket";
+    public static final String AWS_S3_V2_PSEUDO_DIR                = "aws_s3_v2_directory";
 
     public static final String SCHEME_SEPARATOR                    = "://";
     public static final String S3_SCHEME                           = "s3" + SCHEME_SEPARATOR;
@@ -142,6 +144,7 @@ public abstract class BaseHiveEvent {
     public static final String ATTRIBUTE_NAMESPACE                 = "namespace";
     public static final String ATTRIBUTE_OBJECT_PREFIX             = "objectPrefix";
     public static final String ATTRIBUTE_BUCKET                    = "bucket";
+    public static final String ATTRIBUTE_CONTAINER                 = "container";
     public static final String ATTRIBUTE_HOSTNAME                  = "hostName";
     public static final String ATTRIBUTE_EXEC_TIME                 = "execTime";
     public static final String ATTRIBUTE_DDL_QUERIES               = "ddlQueries";
@@ -163,6 +166,7 @@ public abstract class BaseHiveEvent {
     public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns";
     public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc";
     public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs";
+    public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = "aws_s3_v2_container_contained";
     public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions";
     public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries";
     public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries";
@@ -582,33 +586,10 @@ public abstract class BaseHiveEvent {
         }
 
         if (isS3Path(strPath)) {
-            String      bucketName          = path.toUri().getAuthority();
-            String      bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
-            String      pathQualifiedName   = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
-            AtlasEntity bucketEntity        = context.getEntity(bucketQualifiedName);
-
-            ret = context.getEntity(pathQualifiedName);
-
-            if (ret == null) {
-                if (bucketEntity == null) {
-                    bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
-
-                    bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
-                    bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
-
-                    context.putEntity(bucketQualifiedName, bucketEntity);
-                }
-
-                extInfo.addReferredEntity(bucketEntity);
-
-                ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
-
-                ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
-                ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
-                ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-
-                context.putEntity(pathQualifiedName, ret);
+            if (context.isAwsS3AtlasModelVersionV2()) {
+                ret = addS3PathEntityV2(path, strPath, extInfo);
+            } else {
+                ret = addS3PathEntityV1(path, strPath, extInfo);
             }
         } else {
             String nameServiceID     = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
@@ -1153,6 +1134,113 @@ public abstract class BaseHiveEvent {
         return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
     }
 
+    private AtlasEntity addS3PathEntityV1(Path path, String strPath, AtlasEntityExtInfo extInfo) {
+        String      metadataNamespace   = getMetadataNamespace();
+        String      bucketName          = path.toUri().getAuthority();
+        String      bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+        String      pathQualifiedName   = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+        AtlasEntity bucketEntity        = context.getEntity(bucketQualifiedName);
+        AtlasEntity ret                 = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            if (bucketEntity == null) {
+                bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
+
+                bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
+                bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+                context.putEntity(bucketQualifiedName, bucketEntity);
+            }
+
+            extInfo.addReferredEntity(bucketEntity);
+
+            ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
+
+            ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
+            ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+            ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+
+            context.putEntity(pathQualifiedName, ret);
+        }
+
+        return ret;
+    }
+
+    private AtlasEntity addS3PathEntityV2(Path path, String strPath, AtlasEntityExtInfo extInfo) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
+        }
+
+        String      metadataNamespace = getMetadataNamespace();
+        String      pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+        AtlasEntity ret               = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            String      bucketName          = path.toUri().getAuthority();
+            String      schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase();
+            String      bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+            AtlasEntity bucketEntity        = context.getEntity(bucketQualifiedName);
+
+            if (bucketEntity == null) {
+                bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
+
+                bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
+                bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(bucketQualifiedName, bucketEntity);
+            }
+
+            extInfo.addReferredEntity(bucketEntity);
+
+            AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+            String               parentPath  = Path.SEPARATOR;
+            String               dirPath     = path.toUri().getPath();
+
+            if (StringUtils.isEmpty(dirPath)) {
+                dirPath = Path.SEPARATOR;
+            }
+
+            for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+                if (StringUtils.isEmpty(subDirName)) {
+                    continue;
+                }
+
+                String subDirPath          = parentPath + subDirName + Path.SEPARATOR;
+                String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+                ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
+
+                ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
+                ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(subDirQualifiedName, ret);
+
+                parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+                parentPath  = subDirPath;
+            }
+
+            if (ret == null) {
+                ret = bucketEntity;
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
+        }
+
+        return ret;
+    }
 
     static final class EntityComparator implements Comparator<Entity> {
         @Override