You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/06/20 15:38:17 UTC

atlas git commit: ATLAS-2760: Hive hook updates to handle references to s3 paths

Repository: atlas
Updated Branches:
  refs/heads/master f787bcc2b -> 88ac0fa62


ATLAS-2760: Hive hook updates to handle references to s3 paths


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/88ac0fa6
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/88ac0fa6
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/88ac0fa6

Branch: refs/heads/master
Commit: 88ac0fa62186a59ed08a950385c5a1e3c84e517e
Parents: f787bcc
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Sun Jun 17 13:20:37 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Jun 19 23:26:04 2018 -0700

----------------------------------------------------------------------
 .../atlas/hive/hook/events/BaseHiveEvent.java   | 79 ++++++++++++++++----
 .../atlas/hive/hook/events/CreateTable.java     |  2 +-
 2 files changed, 64 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/88ac0fa6/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index ce03287..09f011c 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -78,6 +78,13 @@ public abstract class BaseHiveEvent {
     public static final String HDFS_TYPE_PATH           = "hdfs_path";
     public static final String HBASE_TYPE_TABLE         = "hbase_table";
     public static final String HBASE_TYPE_NAMESPACE     = "hbase_namespace";
+    public static final String AWS_S3_BUCKET            = "aws_s3_bucket";
+    public static final String AWS_S3_PSEUDO_DIR        = "aws_s3_pseudo_dir";
+    public static final String AWS_S3_OBJECT            = "aws_s3_object";
+
+    public static final String SCHEME_SEPARATOR         = "://";
+    public static final String S3_SCHEME                = "s3" + SCHEME_SEPARATOR;
+    public static final String S3A_SCHEME               = "s3a" + SCHEME_SEPARATOR;
 
     public static final String ATTRIBUTE_QUALIFIED_NAME            = "qualifiedName";
     public static final String ATTRIBUTE_NAME                      = "name";
@@ -130,6 +137,8 @@ public abstract class BaseHiveEvent {
     public static final String ATTRIBUTE_URI                       = "uri";
     public static final String ATTRIBUTE_STORAGE_HANDLER           = "storage_handler";
     public static final String ATTRIBUTE_NAMESPACE                 = "namespace";
+    public static final String ATTRIBUTE_OBJECT_PREFIX             = "objectPrefix";
+    public static final String ATTRIBUTE_BUCKET                    = "bucket";
 
     public static final String HBASE_STORAGE_HANDLER_CLASS         = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
     public static final String HBASE_DEFAULT_NAMESPACE             = "default";
@@ -245,7 +254,7 @@ public abstract class BaseHiveEvent {
                 URI location = entity.getLocation();
 
                 if (location != null) {
-                    ret = getHDFSPathEntity(new Path(entity.getLocation()));
+                    ret = getPathEntity(new Path(entity.getLocation()), entityExtInfo);
                 }
             }
             break;
@@ -494,26 +503,60 @@ public abstract class BaseHiveEvent {
         return ret;
     }
 
-    protected AtlasEntity getHDFSPathEntity(Path path) {
-        String      strPath           = path.toString().toLowerCase();
-        String      nameServiceID     = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
-        String      attrPath          = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
-        String      pathQualifiedName = getQualifiedName(attrPath);
-        AtlasEntity ret               = context.getEntity(pathQualifiedName);
+    protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) {
+        AtlasEntity ret;
+        String      strPath = path.toString().toLowerCase();
 
-        if (ret == null) {
-            ret = new AtlasEntity(HDFS_TYPE_PATH);
+        if (isS3Path(strPath)) {
+            String      bucketName          = path.toUri().getAuthority();
+            String      bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+            String      pathQualifiedName   = (strPath + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+            AtlasEntity bucketEntity        = context.getEntity(bucketQualifiedName);
+
+            ret = context.getEntity(pathQualifiedName);
+
+            if (ret == null) {
+                if (bucketEntity == null) {
+                    bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
+
+                    bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
+                    bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+                    context.putEntity(bucketQualifiedName, bucketEntity);
+                }
+
+                extInfo.addReferredEntity(bucketEntity);
+
+                ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
+
+                ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity));
+                ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
 
-            if (StringUtils.isNotEmpty(nameServiceID)) {
-                ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
+                context.putEntity(pathQualifiedName, ret);
             }
+        } else {
+            String nameServiceID     = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
+            String attrPath          = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
+            String pathQualifiedName = getQualifiedName(attrPath);
 
-            ret.setAttribute(ATTRIBUTE_PATH, attrPath);
-            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
-            ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-            ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+            ret = context.getEntity(pathQualifiedName);
+
+            if (ret == null) {
+                ret = new AtlasEntity(HDFS_TYPE_PATH);
+
+                if (StringUtils.isNotEmpty(nameServiceID)) {
+                    ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
+                }
 
-            context.putEntity(pathQualifiedName, ret);
+                ret.setAttribute(ATTRIBUTE_PATH, attrPath);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+                ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+
+                context.putEntity(pathQualifiedName, ret);
+            }
         }
 
         return ret;
@@ -874,6 +917,10 @@ public abstract class BaseHiveEvent {
         return false;
     }
 
+    private boolean isS3Path(String strPath) {
+        return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
+    }
+
 
     static final class EntityComparator implements Comparator<Entity> {
         @Override

http://git-wip-us.apache.org/repos/asf/atlas/blob/88ac0fa6/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
index daf5c86..442a0a0 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
@@ -98,7 +98,7 @@ public class CreateTable extends BaseHiveEvent {
                 }
             } else {
                 if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-                    AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation());
+                    AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
                     AtlasEntity processEntity  = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
 
                     ret.addEntity(processEntity);