You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/06/20 15:38:17 UTC
atlas git commit: ATLAS-2760: Hive hook updates to handle references
to s3 paths
Repository: atlas
Updated Branches:
refs/heads/master f787bcc2b -> 88ac0fa62
ATLAS-2760: Hive hook updates to handle references to s3 paths
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/88ac0fa6
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/88ac0fa6
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/88ac0fa6
Branch: refs/heads/master
Commit: 88ac0fa62186a59ed08a950385c5a1e3c84e517e
Parents: f787bcc
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Sun Jun 17 13:20:37 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Jun 19 23:26:04 2018 -0700
----------------------------------------------------------------------
.../atlas/hive/hook/events/BaseHiveEvent.java | 79 ++++++++++++++++----
.../atlas/hive/hook/events/CreateTable.java | 2 +-
2 files changed, 64 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/88ac0fa6/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index ce03287..09f011c 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -78,6 +78,13 @@ public abstract class BaseHiveEvent {
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String HBASE_TYPE_TABLE = "hbase_table";
public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace";
+ public static final String AWS_S3_BUCKET = "aws_s3_bucket";
+ public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir";
+ public static final String AWS_S3_OBJECT = "aws_s3_object";
+
+ public static final String SCHEME_SEPARATOR = "://";
+ public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
+ public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
@@ -130,6 +137,8 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_URI = "uri";
public static final String ATTRIBUTE_STORAGE_HANDLER = "storage_handler";
public static final String ATTRIBUTE_NAMESPACE = "namespace";
+ public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
+ public static final String ATTRIBUTE_BUCKET = "bucket";
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
@@ -245,7 +254,7 @@ public abstract class BaseHiveEvent {
URI location = entity.getLocation();
if (location != null) {
- ret = getHDFSPathEntity(new Path(entity.getLocation()));
+ ret = getPathEntity(new Path(entity.getLocation()), entityExtInfo);
}
}
break;
@@ -494,26 +503,60 @@ public abstract class BaseHiveEvent {
return ret;
}
- protected AtlasEntity getHDFSPathEntity(Path path) {
- String strPath = path.toString().toLowerCase();
- String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
- String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
- String pathQualifiedName = getQualifiedName(attrPath);
- AtlasEntity ret = context.getEntity(pathQualifiedName);
+ protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) {
+ AtlasEntity ret;
+ String strPath = path.toString().toLowerCase();
- if (ret == null) {
- ret = new AtlasEntity(HDFS_TYPE_PATH);
+ if (isS3Path(strPath)) {
+ String bucketName = path.toUri().getAuthority();
+ String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+ String pathQualifiedName = (strPath + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+ AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
+
+ ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ if (bucketEntity == null) {
+ bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
+
+ bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
+ bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+ context.putEntity(bucketQualifiedName, bucketEntity);
+ }
+
+ extInfo.addReferredEntity(bucketEntity);
+
+ ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
+
+ ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity));
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
- if (StringUtils.isNotEmpty(nameServiceID)) {
- ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
+ context.putEntity(pathQualifiedName, ret);
}
+ } else {
+ String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
+ String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
+ String pathQualifiedName = getQualifiedName(attrPath);
- ret.setAttribute(ATTRIBUTE_PATH, attrPath);
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
- ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
- ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+ ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ ret = new AtlasEntity(HDFS_TYPE_PATH);
+
+ if (StringUtils.isNotEmpty(nameServiceID)) {
+ ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
+ }
- context.putEntity(pathQualifiedName, ret);
+ ret.setAttribute(ATTRIBUTE_PATH, attrPath);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+
+ context.putEntity(pathQualifiedName, ret);
+ }
}
return ret;
@@ -874,6 +917,10 @@ public abstract class BaseHiveEvent {
return false;
}
+ private boolean isS3Path(String strPath) {
+ return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
+ }
+
static final class EntityComparator implements Comparator<Entity> {
@Override
http://git-wip-us.apache.org/repos/asf/atlas/blob/88ac0fa6/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
index daf5c86..442a0a0 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
@@ -98,7 +98,7 @@ public class CreateTable extends BaseHiveEvent {
}
} else {
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation());
+ AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity);