You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/07/12 23:27:39 UTC

[atlas] 01/03: ATLAS-3211 :- Update Hive hook with Relationship Attributes.

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 47c9e42db1ca8785c764302e497c24cb8cac6cc7
Author: Mandar Ambawane <ma...@freestoneinfotech.com>
AuthorDate: Mon Jul 8 20:26:20 2019 +0530

    ATLAS-3211 :- Update Hive hook with Relationship Attributes.
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
    (cherry picked from commit 5c0025177e6520564921c46d9a442e7da6f162c5)
---
 .../atlas/hive/hook/events/AlterTableRename.java   | 32 ++------
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 89 +++++++++++++++++-----
 .../atlas/hive/hook/events/CreateHiveProcess.java  |  6 +-
 .../atlas/model/instance/AtlasRelatedObjectId.java |  6 ++
 .../java/org/apache/atlas/type/AtlasTypeUtil.java  |  4 +
 5 files changed, 89 insertions(+), 48 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
index 927ddc0..c59ff7f 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
@@ -129,17 +129,13 @@ public class AlterTableRename extends BaseHiveEvent {
         // update qualifiedName for all columns, partitionKeys, storageDesc
         String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
 
-        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret);
-        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret);
+        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret);
+        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret);
         renameStorageDesc(oldTableEntity, renamedTableEntity, ret);
 
-        // remove columns, partitionKeys and storageDesc - as they have already been updated above
-        removeAttribute(renamedTableEntity, ATTRIBUTE_COLUMNS);
-        removeAttribute(renamedTableEntity, ATTRIBUTE_PARTITION_KEYS);
-        removeAttribute(renamedTableEntity, ATTRIBUTE_STORAGEDESC);
-
         // set previous name as the alias
         renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName()));
+        renamedTableEntity.getEntity().setRelationshipAttributes(null);
 
         String        oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
         AtlasObjectId oldTableId            = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName);
@@ -179,35 +175,17 @@ public class AlterTableRename extends BaseHiveEvent {
             AtlasObjectId oldSdId = new AtlasObjectId(oldSd.getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldSd.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
 
             newSd.removeAttribute(ATTRIBUTE_TABLE);
+            newSd.setRelationshipAttributes(null);
 
             notifications.add(new EntityPartialUpdateRequestV2(getUserName(), oldSdId, new AtlasEntityWithExtInfo(newSd)));
         }
     }
 
-    private void removeAttribute(AtlasEntityWithExtInfo entity, String attributeName) {
-        Object attributeValue = entity.getEntity().getAttribute(attributeName);
-
-        entity.getEntity().getAttributes().remove(attributeName);
-
-        if (attributeValue instanceof AtlasObjectId) {
-            AtlasObjectId objectId = (AtlasObjectId) attributeValue;
-
-            entity.removeReferredEntity(objectId.getGuid());
-        } else if (attributeValue instanceof Collection) {
-            for (Object item : (Collection) attributeValue)
-                if (item instanceof AtlasObjectId) {
-                    AtlasObjectId objectId = (AtlasObjectId) item;
-
-                    entity.removeReferredEntity(objectId.getGuid());
-                }
-        }
-    }
-
     private AtlasEntity getStorageDescEntity(AtlasEntityWithExtInfo tableEntity) {
         AtlasEntity ret = null;
 
         if (tableEntity != null && tableEntity.getEntity() != null) {
-            Object attrSdId = tableEntity.getEntity().getAttribute(ATTRIBUTE_STORAGEDESC);
+            Object attrSdId = tableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_STORAGEDESC);
 
             if (attrSdId instanceof AtlasObjectId) {
                 ret = tableEntity.getReferredEntity(((AtlasObjectId) attrSdId).getGuid());
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index c15bbfa..0bf3ce2 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.repository.Constants;
@@ -161,6 +162,20 @@ public abstract class BaseHiveEvent {
     public static final String HDFS_PATH_PREFIX                    = "hdfs://";
     public static final String EMPTY_ATTRIBUTE_VALUE = "";
 
+    public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs";
+    public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs";
+    public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE = "hive_process_column_lineage";
+    public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
+    public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS = "hive_table_partitionkeys";
+    public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns";
+    public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc";
+    public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs";
+    public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions";
+    public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries";
+    public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries";
+    public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace";
+
+
     public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
 
 
@@ -193,6 +208,28 @@ public abstract class BaseHiveEvent {
         return table.getTTable() != null ? (table.getOwner()): "";
     }
 
+
+    public static AtlasRelatedObjectId getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) {
+        AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(getObjectId(entity), relationShipType);
+        return atlasRelatedObjectId;
+    }
+
+
+
+    public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) {
+        final List<AtlasRelatedObjectId> ret;
+        if (CollectionUtils.isNotEmpty(entities)) {
+            ret = new ArrayList<>(entities.size());
+            for (AtlasEntity entity : entities) {
+                ret.add(getObjectIdWithRelationshipType(entity, relationshipType));
+            }
+        } else {
+            ret = Collections.emptyList();
+        }
+        return ret;
+    }
+
+
     public static AtlasObjectId getObjectId(AtlasEntity entity) {
         String        qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
         AtlasObjectId ret           = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
@@ -393,7 +430,9 @@ public abstract class BaseHiveEvent {
                 long createTime     = getTableCreateTime(table);
                 long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
 
-                ret.setAttribute(ATTRIBUTE_DB, dbId);
+                AtlasRelatedObjectId dbRelatedObject =     new AtlasRelatedObjectId(dbId, RELATIONSHIP_HIVE_TABLE_DB);
+
+                ret.setRelationshipAttribute(ATTRIBUTE_DB, dbRelatedObject );
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
                 ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
                 ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
@@ -420,8 +459,10 @@ public abstract class BaseHiveEvent {
                 } else {
                     AtlasObjectId     tableId       = getObjectId(ret);
                     AtlasEntity       sd            = getStorageDescEntity(tableId, table);
-                    List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
-                    List<AtlasEntity> columns       = getColumnEntities(tableId, table, table.getCols());
+                    List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys(), RELATIONSHIP_HIVE_TABLE_PART_KEYS);
+                    List<AtlasEntity> columns       = getColumnEntities(tableId, table, table.getCols(), RELATIONSHIP_HIVE_TABLE_COLUMNS);
+
+
 
                     if (entityExtInfo != null) {
                         entityExtInfo.addReferredEntity(sd);
@@ -439,9 +480,10 @@ public abstract class BaseHiveEvent {
                         }
                     }
 
-                    ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
-                    ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
-                    ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
+
+                    ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+                    ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
+                    ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
                 }
 
                 context.putEntity(tblQualifiedName, ret);
@@ -469,7 +511,9 @@ public abstract class BaseHiveEvent {
 
             StorageDescriptor sd = table.getSd();
 
-            ret.setAttribute(ATTRIBUTE_TABLE, tableId);
+            AtlasRelatedObjectId tableRelatedObject =     new AtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC);
+
+            ret.setRelationshipAttribute(ATTRIBUTE_TABLE, tableRelatedObject);
             ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
             ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters());
             ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(sd.getLocation()));
@@ -515,7 +559,7 @@ public abstract class BaseHiveEvent {
         return ret;
     }
 
-    protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas) {
+    protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas, String relationshipType) {
         List<AtlasEntity> ret            = new ArrayList<>();
         boolean           isKnownTable   = tableId.getGuid() == null;
         int               columnPosition = 0;
@@ -534,8 +578,8 @@ public abstract class BaseHiveEvent {
                     if (isKnownTable) {
                         column.setGuid(null);
                     }
-
-                    column.setAttribute(ATTRIBUTE_TABLE, tableId);
+                    AtlasRelatedObjectId relatedObjectId = new AtlasRelatedObjectId(tableId, relationshipType);
+                    column.setRelationshipAttribute(ATTRIBUTE_TABLE, (relatedObjectId));
                     column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName);
                     column.setAttribute(ATTRIBUTE_NAME, fieldSchema.getName());
                     column.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
@@ -583,7 +627,7 @@ public abstract class BaseHiveEvent {
 
                 ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
 
-                ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity));
+                ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getObjectIdWithRelationshipType(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
                 ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
                 ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
@@ -631,8 +675,8 @@ public abstract class BaseHiveEvent {
         }
 
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
-        ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
-        ret.setAttribute(ATTRIBUTE_OUTPUTS,  getObjectIds(outputs));
+        ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
+        ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, getObjectIdsWithRelationshipType(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
         ret.setAttribute(ATTRIBUTE_NAME, queryStr);
         ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
 
@@ -668,8 +712,9 @@ public abstract class BaseHiveEvent {
         ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
         ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId());
         ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
-        ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName());
-        ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess));
+        ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName()); //
+        AtlasRelatedObjectId hiveProcessRelationObjectId = AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess, RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE);
+        ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, hiveProcessRelationObjectId);
         return ret;
     }
 
@@ -684,11 +729,16 @@ public abstract class BaseHiveEvent {
         if (excludeEntityGuid) {
             objId.setGuid(null);
         }
+        AtlasRelatedObjectId objIdRelatedObject =     new AtlasRelatedObjectId(objId);
 
         if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) {
-            hiveDDL = new AtlasEntity(HIVE_DB_DDL, ATTRIBUTE_DB, objId);
+            hiveDDL = new AtlasEntity(HIVE_DB_DDL);
+            objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_DB_DDL_QUERIES);
+            hiveDDL.setRelationshipAttribute(ATTRIBUTE_DB, objIdRelatedObject);
         } else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) {
-            hiveDDL = new AtlasEntity(HIVE_TABLE_DDL, ATTRIBUTE_TABLE, objId);
+            hiveDDL = new AtlasEntity(HIVE_TABLE_DDL);
+            objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_TABLE_DDL_QUERIES);
+            hiveDDL.setRelationshipAttribute( ATTRIBUTE_TABLE, objIdRelatedObject);
         }
 
         if (hiveDDL != null) {
@@ -948,7 +998,10 @@ public abstract class BaseHiveEvent {
 
             ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
             ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
-            ret.setAttribute(ATTRIBUTE_NAMESPACE, getObjectId(nsEntity));
+
+            AtlasRelatedObjectId objIdRelatedObject =     new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
+
+            ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject);
             ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName));
 
             entities.addReferredEntity(nsEntity);
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 7791fb4..91e063e 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -207,9 +207,9 @@ public class CreateHiveProcess extends BaseHiveEvent {
 
             columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
-            columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
-            columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn)));
-            columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(hiveProcess));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectIdWithRelationshipType(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getObjectIdWithRelationshipType(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
             columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType());
             columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr());
 
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
index 7c57ccf..ae6932d 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
@@ -83,6 +83,12 @@ public class AtlasRelatedObjectId extends AtlasObjectId implements Serializable
         super(other);
     }
 
+    public AtlasRelatedObjectId(AtlasObjectId objId, String relationshipType) {
+        this(objId);
+
+        setRelationshipType(relationshipType);
+    }
+
     public AtlasRelatedObjectId(Map objIdMap) {
         super(objIdMap);
 
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index d74c7e3..6ac176d 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -414,6 +414,10 @@ public class AtlasTypeUtil {
         return new AtlasRelatedObjectId(getAtlasObjectId(entity));
     }
 
+    public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, String relationshipType){
+        return new AtlasRelatedObjectId(getAtlasObjectId(entity), relationshipType);
+    }
+
     public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
         return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry));
     }