You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/03/12 21:29:53 UTC

[atlas] branch master updated: ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new fc2a926  ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2
fc2a926 is described below

commit fc2a926cc7d0f0070c25f9afc68bf6a5a1bb6df2
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Mon Mar 11 16:17:03 2019 -0700

    ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2
---
 addons/models/1000-Hadoop/1030-hive_model.json     |   2 +-
 .../org/apache/atlas/type/AtlasBuiltInTypes.java   |  13 +-
 .../org/apache/atlas/type/AtlasEntityType.java     |   4 +-
 .../apache/atlas/type/AtlasRelationshipType.java   |   7 +-
 .../org/apache/atlas/type/AtlasStructType.java     |  37 ++++-
 .../org/apache/atlas/utils/AtlasEntityUtil.java    |  33 ++++-
 .../atlas/discovery/EntityDiscoveryService.java    |  19 +--
 .../converters/AtlasStructFormatConverter.java     |   9 +-
 .../store/graph/v2/AtlasEntityStoreV2.java         |   2 +-
 .../store/graph/v2/EntityGraphMapper.java          |  79 ++++++++---
 .../notification/NotificationHookConsumer.java     | 153 ++++++++++++++++++++-
 .../preprocessor/HivePreprocessor.java             |   4 +-
 .../preprocessor/PreprocessorContext.java          |   4 +
 13 files changed, 305 insertions(+), 61 deletions(-)

diff --git a/addons/models/1000-Hadoop/1030-hive_model.json b/addons/models/1000-Hadoop/1030-hive_model.json
index 324d716..7207a41 100644
--- a/addons/models/1000-Hadoop/1030-hive_model.json
+++ b/addons/models/1000-Hadoop/1030-hive_model.json
@@ -508,7 +508,7 @@
             "serviceType": "hive",
             "typeVersion": "1.2",
             "relationshipCategory": "COMPOSITION",
-            "relationshipLabel": "__hive_table.partitionkeys",
+            "relationshipLabel": "__hive_table.partitionKeys",
             "endDef1": {
                 "type": "hive_table",
                 "name": "partitionKeys",
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
index 6bedf6d..ce14b5b 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
@@ -20,6 +20,7 @@ package org.apache.atlas.type;
 
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
@@ -672,19 +673,25 @@ public class AtlasBuiltInTypes {
 
         @Override
         public AtlasObjectId getNormalizedValue(Object obj) {
+            AtlasObjectId ret = null;
+
             if (obj != null) {
                 if (obj instanceof AtlasObjectId) {
-                    return (AtlasObjectId) obj;
+                    ret = (AtlasObjectId) obj;
                 } else if (obj instanceof Map) {
                     Map map = (Map) obj;
 
                     if (isValidMap(map)) {
-                        return new AtlasObjectId(map);
+                        if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+                            ret = new AtlasRelatedObjectId(map);
+                        } else {
+                            ret = new AtlasObjectId(map);
+                        }
                     }
                 }
             }
 
-            return null;
+            return ret;
         }
 
         private boolean isValidMap(Map map) {
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 2557bb3..b5360c1 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -799,7 +799,7 @@ public class AtlasEntityType extends AtlasStructType {
                 AtlasEntity entityObj = (AtlasEntity) obj;
 
                 for (String attributeName : relationshipAttributes.keySet()) {
-                    Object         value            = entityObj.getAttribute(attributeName);
+                    Object         value            = entityObj.getRelationshipAttribute(attributeName);
                     String         relationshipType = AtlasEntityUtil.getRelationshipType(value);
                     AtlasAttribute attribute        = getRelationshipAttribute(attributeName, relationshipType);
 
@@ -824,7 +824,7 @@ public class AtlasEntityType extends AtlasStructType {
                     }
                 }
             } else if (obj instanceof Map) {
-                Map attributes = AtlasTypeUtil.toStructAttributes((Map) obj);
+                Map attributes = AtlasTypeUtil.toRelationshipAttributes((Map) obj);
 
                 for (String attributeName : relationshipAttributes.keySet()) {
                     Object         value            = attributes.get(attributeName);
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index 3ea8d80..183772b 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -159,7 +159,9 @@ public class AtlasRelationshipType extends AtlasStructType {
             AtlasRelationshipEdgeDirection end2Direction = IN;
 
             if (endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
-                end2Direction = OUT;
+                if (relationshipDef.getRelationshipLabel() == null) { // only if label hasn't been overridden
+                    end2Direction = OUT;
+                }
             } else if (!endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
                 end1Direction = IN;
                 end2Direction = OUT;
@@ -345,11 +347,12 @@ public class AtlasRelationshipType extends AtlasStructType {
             }
 
             attribute = new AtlasAttribute(entityType, attributeDef,
-                                           typeRegistry.getType(attrTypeName), relationshipLabel);
+                                           typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel);
 
         } else {
             // attribute already exists (legacy attribute which is also a relationship attribute)
             // add relationshipLabel information to existing attribute
+            attribute.setRelationshipName(getTypeName());
             attribute.setRelationshipEdgeLabel(relationshipLabel);
         }
 
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index 84c76d7..0be7e18 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
+import static org.apache.atlas.model.TypeCategory.*;
 import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE;
 import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF;
 import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF;
@@ -701,19 +702,23 @@ public class AtlasStructType extends AtlasType {
         private final String                   vertexPropertyName;
         private final String                   vertexUniquePropertyName;
         private final boolean                  isOwnedRef;
+        private final boolean                  isObjectRef;
         private final String                   inverseRefAttributeName;
         private AtlasAttribute                 inverseRefAttribute;
+        private String                         relationshipName;
         private String                         relationshipEdgeLabel;
         private AtlasRelationshipEdgeDirection relationshipEdgeDirection;
 
-        public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipLabel) {
+        public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipName, String relationshipLabel) {
             this.definedInType            = definedInType;
             this.attributeDef             = attrDef;
             this.attributeType            = attributeType.getTypeForAttribute();
             this.qualifiedName            = getQualifiedAttributeName(definedInType.getStructDef(), attributeDef.getName());
             this.vertexPropertyName       = encodePropertyKey(this.qualifiedName);
             this.vertexUniquePropertyName = attrDef.getIsUnique() ? encodePropertyKey(getQualifiedAttributeName(definedInType.getStructDef(), UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX + attributeDef.getName())) : null;
+            this.relationshipName         = relationshipName;
             this.relationshipEdgeLabel    = getRelationshipEdgeLabel(relationshipLabel);
+
             boolean isOwnedRef            = false;
             String  inverseRefAttribute   = null;
 
@@ -736,10 +741,32 @@ public class AtlasStructType extends AtlasType {
             this.isOwnedRef                = isOwnedRef;
             this.inverseRefAttributeName   = inverseRefAttribute;
             this.relationshipEdgeDirection = AtlasRelationshipEdgeDirection.OUT;
+
+            switch (attributeType.getTypeCategory()) {
+                case OBJECT_ID_TYPE:
+                    isObjectRef = true;
+                break;
+
+                case MAP:
+                    AtlasMapType mapType = (AtlasMapType) attributeType;
+
+                    isObjectRef = mapType.getValueType().getTypeCategory() == OBJECT_ID_TYPE;
+                break;
+
+                case ARRAY:
+                    AtlasArrayType arrayType = (AtlasArrayType) attributeType;
+
+                    isObjectRef = arrayType.getElementType().getTypeCategory() == OBJECT_ID_TYPE;
+                break;
+
+                default:
+                    isObjectRef = false;
+                break;
+            }
         }
 
         public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType) {
-            this(definedInType, attrDef, attributeType, null);
+            this(definedInType, attrDef, attributeType, null, null);
         }
 
         public AtlasStructType getDefinedInType() { return definedInType; }
@@ -766,12 +793,18 @@ public class AtlasStructType extends AtlasType {
 
         public boolean isOwnedRef() { return isOwnedRef; }
 
+        public boolean isObjectRef() { return isObjectRef; }
+
         public String getInverseRefAttributeName() { return inverseRefAttributeName; }
 
         public AtlasAttribute getInverseRefAttribute() { return inverseRefAttribute; }
 
         public void setInverseRefAttribute(AtlasAttribute inverseAttr) { inverseRefAttribute = inverseAttr; }
 
+        public String getRelationshipName() { return relationshipName; }
+
+        public void setRelationshipName(String relationshipName) { this.relationshipName = relationshipName; }
+
         public String getRelationshipEdgeLabel() { return relationshipEdgeLabel; }
 
         public void setRelationshipEdgeLabel(String relationshipEdgeLabel) { this.relationshipEdgeLabel = relationshipEdgeLabel; }
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
index 3002217..1e78e25 100644
--- a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -126,10 +127,38 @@ public class AtlasEntityUtil {
 
         if (val instanceof AtlasRelatedObjectId) {
             ret = ((AtlasRelatedObjectId) val).getRelationshipType();
+        } else if (val instanceof Collection) {
+            String elemRelationshipType = null;
+
+            for (Object elem : (Collection) val) {
+                elemRelationshipType = getRelationshipType(elem);
+
+                if (elemRelationshipType != null) {
+                    break;
+                }
+            }
+
+            ret = elemRelationshipType;
         } else if (val instanceof Map) {
-            Object relTypeName = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE);
+            Map mapValue = (Map) val;
+
+            if (mapValue.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+                Object relTypeName = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE);
 
-            ret = relTypeName != null ? relTypeName.toString() : null;
+                ret = relTypeName != null ? relTypeName.toString() : null;
+            } else {
+                String entryRelationshipType = null;
+
+                for (Object entryVal : mapValue.values()) {
+                    entryRelationshipType = getRelationshipType(entryVal);
+
+                    if (entryRelationshipType != null) {
+                        break;
+                    }
+                }
+
+                ret = entryRelationshipType;
+            }
         } else {
             ret = null;
         }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 9df360c..19f81d3 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -537,7 +537,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
         AtlasAttribute attribute = entityType.getAttribute(relation);
 
         if (attribute != null) {
-            if (isRelationshipAttribute(attribute)) {
+            if (attribute.isObjectRef()) {
                 relation = attribute.getRelationshipEdgeLabel();
             } else {
                 throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName());
@@ -790,23 +790,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
         return "";
     }
 
-    private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException {
-        boolean   ret      = true;
-        AtlasType attrType = attribute.getAttributeType();
-
-        if (attrType.getTypeCategory() == ARRAY) {
-            attrType = ((AtlasArrayType) attrType).getElementType();
-        } else if (attrType.getTypeCategory() == MAP) {
-            attrType = ((AtlasMapType) attrType).getValueType();
-        }
-
-        if (attrType.getTypeCategory() != OBJECT_ID_TYPE) {
-            ret = false;
-        }
-
-        return ret;
-    }
-
     private Set<String> getEntityStates() {
         return new HashSet<>(Arrays.asList(ACTIVE.toString(), DELETED.toString()));
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
index 51a6426..173fcee 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.utils.AtlasEntityUtil;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.type.*;
 import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
@@ -134,11 +135,12 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
 
             // Only process the requested/set attributes
             for (String attrName : attributes.keySet()) {
-                AtlasAttribute attr = structType.getAttribute(attrName);
+                Object         v2Value = attributes.get(attrName);
+                AtlasAttribute attr    = structType.getAttribute(attrName);
 
                 if (attr == null) {
                     if (isEntityType) {
-                        attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, null);
+                        attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(v2Value));
                     }
 
                     if (attr == null) {
@@ -149,7 +151,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
 
                 AtlasType            attrType      = attr.getAttributeType();
                 AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
-                Object               v2Value       = attributes.get(attr.getName());
 
                 if (v2Value != null && isEntityType && attr.isOwnedRef()) {
                     if (LOG.isDebugEnabled()) {
@@ -256,6 +257,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
             // Only process the requested/set attributes
             for (Object attribKey : attributes.keySet()) {
                 String         attrName = attribKey.toString();
+                Object         v1Value  = attributes.get(attrName);
                 AtlasAttribute attr     = structType.getAttribute(attrName);
 
                 if (attr == null) {
@@ -271,7 +273,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
 
                 AtlasType            attrType      = attr.getAttributeType();
                 AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
-                Object               v1Value       = attributes.get(attrName);
 
                 if (attrConverter.isValidValueV1(v1Value, attrType)) {
                     Object v2Value = attrConverter.fromV1ToV2(v1Value, attrType, context);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index a62f335..a5a6291 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -323,7 +323,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
         AtlasAttribute    attr       = entityType.getAttribute(attrName);
 
         if (attr == null) {
-            attr = entityType.getRelationshipAttribute(attrName, null);
+            attr = entityType.getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(attrValue));
 
             if (attr == null) {
                 throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index a6f1250..31b20ff 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -79,8 +79,43 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DE
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
-import static org.apache.atlas.repository.Constants.*;
-import static org.apache.atlas.repository.graph.GraphHelper.*;
+import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
+import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
+import static org.apache.atlas.repository.Constants.IS_PROXY_KEY;
+import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
+import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
+import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations;
+import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
+import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
+import static org.apache.atlas.repository.graph.GraphHelper.isActive;
+import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
+import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
+import static org.apache.atlas.repository.graph.GraphHelper.string;
+import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
@@ -827,27 +862,29 @@ public class EntityGraphMapper {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, (ctx.getValue() == null ? null : ctx.getValue().toString()));
         }
 
-        String    attributeName = ctx.getAttribute().getName();
-        AtlasType type          = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
-
-        AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection();
+        AtlasType type = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
 
         if (type instanceof AtlasEntityType) {
             AtlasEntityType entityType = (AtlasEntityType) type;
+            AtlasAttribute  attribute     = ctx.getAttribute();
+            String          attributeName = attribute.getName();
 
             // use relationship to create/update edges
             if (entityType.hasRelationshipAttribute(attributeName)) {
                 Map<String, Object> relationshipAttributes = getRelationshipAttributes(ctx.getValue());
 
                 if (ctx.getCurrentEdge() != null) {
-                    ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, edgeDirection, relationshipAttributes);
-
+                    ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, attribute.getRelationshipEdgeDirection(), relationshipAttributes);
                 } else {
-                    String      relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
+                    String      relationshipName = attribute.getRelationshipName();
                     AtlasVertex fromVertex;
                     AtlasVertex toVertex;
 
-                    if (edgeDirection == IN) {
+                    if (StringUtils.isEmpty(relationshipName)) {
+                        relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
+                    }
+
+                    if (attribute.getRelationshipEdgeDirection() == IN) {
                         fromVertex = attributeVertex;
                         toVertex   = entityVertex;
 
@@ -1106,21 +1143,29 @@ public class EntityGraphMapper {
     }
 
     private static AtlasObjectId getObjectId(Object val) throws AtlasBaseException {
+        AtlasObjectId ret = null;
+
         if (val != null) {
             if ( val instanceof  AtlasObjectId) {
-                return ((AtlasObjectId) val);
+                ret = ((AtlasObjectId) val);
             } else if (val instanceof Map) {
-                AtlasObjectId ret = new AtlasObjectId((Map)val);
+                Map map = (Map) val;
 
-                if (AtlasTypeUtil.isValid(ret)) {
-                    return ret;
+                if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+                    ret = new AtlasRelatedObjectId(map);
+                } else {
+                    ret = new AtlasObjectId((Map) val);
                 }
-            }
 
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+                if (!AtlasTypeUtil.isValid(ret)) {
+                    throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+                }
+            } else {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+            }
         }
 
-        return null;
+        return ret;
     }
 
     private static String getGuid(Object val) throws AtlasBaseException {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index d16d544..d1d6003 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -35,6 +35,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
 import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
@@ -56,12 +57,14 @@ import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.service.Service;
 import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.filters.AuditFilter.AuditLog;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.common.TopicPartition;
@@ -75,6 +78,7 @@ import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -86,6 +90,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+import static org.apache.atlas.model.instance.AtlasObjectId.*;
+import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
+
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -174,7 +181,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
         minWaitDuration       = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
         maxWaitDuration       = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default
-        commitBatchSize       = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0);
+        commitBatchSize       = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
 
         skipHiveColumnLineageHive20633                = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
         skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
@@ -216,8 +223,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             hiveTablesCache = Collections.emptyMap();
         }
 
-        hiveTypesRemoveOwnedRefAttrs  = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, false);
-        rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, false);
+        hiveTypesRemoveOwnedRefAttrs  = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
+        rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
         preprocessEnabled             = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
@@ -704,6 +711,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
                 atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
             } else {
+                Map<String, String> guidAssignments = new HashMap<>();
+
                 for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
                     int toIndex = fromIdx + commitBatchSize;
 
@@ -711,10 +720,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         toIndex = entitiesList.size();
                     }
 
-                    AtlasEntitiesWithExtInfo batch       = new AtlasEntitiesWithExtInfo(new ArrayList<>(entitiesList.subList(fromIdx, toIndex)));
+                    List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex));
+
+                    updateProcessedEntityReferences(entitiesBatch, guidAssignments);
+
+                    AtlasEntitiesWithExtInfo batch       = new AtlasEntitiesWithExtInfo(entitiesBatch);
                     AtlasEntityStream        batchStream = new AtlasEntityStream(batch, entityStream);
 
-                    atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
+                    EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
+
+                    recordProcessedEntities(response, guidAssignments);
 
                     RequestContext.get().resetEntityGuidUpdates();
 
@@ -801,7 +816,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
 
-        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs) {
+        if (context.isHivePreprocessEnabled()) {
             preprocessHiveTypes(context);
         }
 
@@ -814,6 +829,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         context.moveRegisteredReferredEntities();
+
+        if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
+            // move hive_process and hive_column_lineage entities to end of the list
+            List<AtlasEntity> entities = context.getEntities();
+            int               count    = entities.size();
+
+            for (int i = 0; i < count; i++) {
+                AtlasEntity entity = entities.get(i);
+
+                switch (entity.getTypeName()) {
+                    case TYPE_HIVE_PROCESS:
+                    case TYPE_HIVE_COLUMN_LINEAGE:
+                        entities.remove(i--);
+                        entities.add(entity);
+                        count--;
+                    break;
+                }
+            }
+
+            if (entities.size() - count > 0) {
+                LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
+            }
+        }
     }
 
     private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
@@ -896,7 +934,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         if (lineageQNames.contains(qualifiedName)) {
                             entities.remove(i--);
 
-                            LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
+                            LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition());
 
                             numRemovedEntities++;
 
@@ -965,6 +1003,107 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         return ret;
     }
 
+    private void recordProcessedEntities(EntityMutationResponse mutationResponse, Map<String, String> guidAssignments) {
+        if (mutationResponse != null && MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
+            guidAssignments.putAll(mutationResponse.getGuidAssignments());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("recordProcessedEntities: added {} guidAssignments. updatedSize={}", mutationResponse.getGuidAssignments().size(), guidAssignments.size());
+            }
+        }
+    }
+
+    private void updateProcessedEntityReferences(List<AtlasEntity> entities, Map<String, String> guidAssignments) {
+        if (CollectionUtils.isNotEmpty(entities) && MapUtils.isNotEmpty(guidAssignments)) {
+            for (AtlasEntity entity : entities) {
+                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+                if (entityType == null) {
+                    continue;
+                }
+
+                if (MapUtils.isNotEmpty(entity.getAttributes())) {
+                    for (Map.Entry<String, Object> entry : entity.getAttributes().entrySet()) {
+                        String         attrName  = entry.getKey();
+                        Object         attrValue = entry.getValue();
+
+                        if (attrValue == null) {
+                            continue;
+                        }
+
+                        AtlasAttribute attribute = entityType.getAttribute(attrName);
+
+                        if (attribute == null) { // look for a relationship attribute with the same name
+                            attribute = entityType.getRelationshipAttribute(attrName, null);
+                        }
+
+                        if (attribute != null && attribute.isObjectRef()) {
+                            updateProcessedEntityReferences(attrValue, guidAssignments);
+                        }
+                    }
+                }
+
+                if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+                    for (Map.Entry<String, Object> entry : entity.getRelationshipAttributes().entrySet()) {
+                        Object attrValue = entry.getValue();
+
+                        if (attrValue != null) {
+                            updateProcessedEntityReferences(attrValue, guidAssignments);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void updateProcessedEntityReferences(Object objVal, Map<String, String> guidAssignments) {
+        if (objVal instanceof AtlasObjectId) {
+            updateProcessedEntityReferences((AtlasObjectId) objVal, guidAssignments);
+        } else if (objVal instanceof Collection) {
+            updateProcessedEntityReferences((Collection) objVal, guidAssignments);
+        } else if (objVal instanceof Map) {
+            updateProcessedEntityReferences((Map) objVal, guidAssignments);
+        }
+    }
+
+    private void updateProcessedEntityReferences(AtlasObjectId objId, Map<String, String> guidAssignments) {
+        String guid = objId.getGuid();
+
+        if (guid != null && guidAssignments.containsKey(guid)) {
+            String assignedGuid = guidAssignments.get(guid);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.getTypeName(), guid, assignedGuid);
+            }
+
+            objId.setGuid(assignedGuid);
+            objId.setTypeName(null);
+            objId.setUniqueAttributes(null);
+        }
+    }
+
+    private void updateProcessedEntityReferences(Map objId, Map<String, String> guidAssignments) {
+        Object guid = objId.get(KEY_GUID);
+
+        if (guid != null && guidAssignments.containsKey(guid)) {
+            String assignedGuid = guidAssignments.get(guid);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.get(KEY_TYPENAME), guid, assignedGuid);
+            }
+
+            objId.put(KEY_GUID, assignedGuid);
+            objId.remove(KEY_TYPENAME);
+            objId.remove(KEY_UNIQUE_ATTRIBUTES);
+        }
+    }
+
+    private void updateProcessedEntityReferences(Collection objIds, Map<String, String> guidAssignments) {
+        for (Object objId : objIds) {
+            updateProcessedEntityReferences(objId, guidAssignments);
+        }
+    }
+
     static class FailedCommitOffsetRecorder {
         private Long currentOffset;
 
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index ff9c9cb..9d6ad22 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -36,7 +36,7 @@ public class HivePreprocessor {
     private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
 
     private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS        = "hive_table_columns";
-    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys";
+    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
 
     static class HiveTablePreprocessor extends EntityPreprocessor {
         public HiveTablePreprocessor() {
@@ -76,7 +76,7 @@ public class HivePreprocessor {
         }
 
         private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
-            Object attrVal = entity.getAttribute(attrName);
+            Object attrVal = entity.removeAttribute(attrName);
 
             if (attrVal != null) {
                 Set<String> guids = new HashSet<>();
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 94e0993..c85c1b8 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -91,6 +91,10 @@ public class PreprocessorContext {
 
     public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
 
+    public boolean isHivePreprocessEnabled() {
+        return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs;
+    }
+
     public List<AtlasEntity> getEntities() {
         return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
     }