You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/04/05 07:07:29 UTC

[atlas] branch master updated: ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e4fb5c  ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3
3e4fb5c is described below

commit 3e4fb5cd84262c6c675e434628e607d7afd518f4
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Sun Mar 31 11:37:46 2019 -0700

    ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3
---
 .../java/org/apache/atlas/type/AtlasArrayType.java | 344 ++++++++++++---------
 .../org/apache/atlas/type/AtlasEntityType.java     |  92 +++---
 .../apache/atlas/type/AtlasRelationshipType.java   |  11 +-
 .../org/apache/atlas/type/AtlasStructType.java     |   1 +
 .../java/org/apache/atlas/type/AtlasTypeUtil.java  |  27 ++
 .../org/apache/atlas/type/TestAtlasStructType.java |   4 +-
 .../converters/AtlasStructFormatConverter.java     |   2 +-
 .../store/graph/v2/AtlasEntityStream.java          |   5 +
 .../notification/NotificationHookConsumer.java     | 124 +++++---
 .../preprocessor/EntityPreprocessor.java           |   1 +
 .../preprocessor/HivePreprocessor.java             |  63 +---
 .../preprocessor/PreprocessorContext.java          | 186 ++++++++++-
 .../preprocessor/RdbmsPreprocessor.java            |  16 +-
 13 files changed, 549 insertions(+), 327 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
index 6147eee..656d946 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
@@ -21,12 +21,14 @@ package org.apache.atlas.type;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -43,33 +45,36 @@ public class AtlasArrayType extends AtlasType {
     private final String elementTypeName;
     private int          minCount;
     private int          maxCount;
+    private Cardinality  cardinality;
 
     private AtlasType elementType;
 
     public AtlasArrayType(AtlasType elementType) {
-        this(elementType, COUNT_NOT_SET, COUNT_NOT_SET);
+        this(elementType, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST);
     }
 
-    public AtlasArrayType(AtlasType elementType, int minCount, int maxCount) {
+    public AtlasArrayType(AtlasType elementType, int minCount, int maxCount, Cardinality cardinality) {
         super(AtlasBaseTypeDef.getArrayTypeName(elementType.getTypeName()), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE);
 
         this.elementTypeName = elementType.getTypeName();
         this.minCount        = minCount;
         this.maxCount        = maxCount;
+        this.cardinality     = cardinality;
         this.elementType     = elementType;
     }
 
     public AtlasArrayType(String elementTypeName, AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
-        this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, typeRegistry);
+        this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST, typeRegistry);
     }
 
-    public AtlasArrayType(String elementTypeName, int minCount, int maxCount, AtlasTypeRegistry typeRegistry)
+    public AtlasArrayType(String elementTypeName, int minCount, int maxCount, Cardinality cardinality, AtlasTypeRegistry typeRegistry)
         throws  AtlasBaseException {
         super(AtlasBaseTypeDef.getArrayTypeName(elementTypeName), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE);
 
         this.elementTypeName = elementTypeName;
         this.minCount        = minCount;
         this.maxCount        = maxCount;
+        this.cardinality     = cardinality;
 
         this.resolveReferences(typeRegistry);
     }
@@ -90,6 +95,10 @@ public class AtlasArrayType extends AtlasType {
         return maxCount;
     }
 
+    public void setCardinality(Cardinality cardinality) { this.cardinality = cardinality; }
+
+    public Cardinality getCardinality() { return cardinality; }
+
     public AtlasType getElementType() {
         return elementType;
     }
@@ -151,79 +160,12 @@ public class AtlasArrayType extends AtlasType {
 
     @Override
     public boolean areEqualValues(Object val1, Object val2, Map<String, String> guidAssignments) {
-        boolean ret = true;
+        final boolean ret;
 
-        if (val1 == null) {
-            ret = val2 == null;
-        } else if (val2 == null) {
-            ret = false;
+        if (cardinality == Cardinality.SET) {
+            ret = areEqualSets(val1, val2, guidAssignments);
         } else {
-            if (val1.getClass().isArray() && val2.getClass().isArray()) {
-                int len = Array.getLength(val1);
-
-                if (len != Array.getLength(val2)) {
-                    ret = false;
-                } else {
-                    for (int i = 0; i < len; i++) {
-                        if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) {
-                            ret = false;
-
-                            break;
-                        }
-                    }
-                }
-            } else if ((val1 instanceof Set) && (val2 instanceof Set)) {
-                Set set1 = (Set) val1;
-                Set set2 = (Set) val2;
-
-                if (set1.size() != set2.size()) {
-                    ret = false;
-                } else {
-                    for (Object elem1 : set1) {
-                        boolean foundInSet2 = false;
-
-                        for (Object elem2 : set2) {
-                            if (elementType.areEqualValues(elem1, elem2, guidAssignments)) {
-                                foundInSet2 = true;
-
-                                break;
-                            }
-                        }
-
-                        if (!foundInSet2) {
-                            ret = false;
-
-                            break;
-                        }
-                    }
-                }
-            } else {
-                List list1 = getListFromValue(val1);
-
-                if (list1 == null) {
-                    ret = false;
-                } else {
-                    List list2 = getListFromValue(val2);
-
-                    if (list2 == null) {
-                        ret = false;
-                    } else {
-                        int len = list1.size();
-
-                        if (len != list2.size()) {
-                            ret = false;
-                        } else {
-                            for (int i = 0; i < len; i++) {
-                                if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) {
-                                    ret = false;
-
-                                    break;
-                                }
-                            }
-                        }
-                    }
-                }
-            }
+            ret = areEqualLists(val1, val2, guidAssignments);
         }
 
         return ret;
@@ -266,128 +208,120 @@ public class AtlasArrayType extends AtlasType {
 
     @Override
     public Collection<?> getNormalizedValue(Object obj) {
-        if (obj == null) {
-            return null;
-        }
+        Collection<Object> ret = null;
 
-        if (obj instanceof String){
-             obj = AtlasType.fromJson(obj.toString(), List.class);
+        if (obj instanceof String) {
+            obj = AtlasType.fromJson(obj.toString(), List.class);
         }
 
         if (obj instanceof List || obj instanceof Set) {
-            List<Object> ret = new ArrayList<>();
+            Collection collObj = (Collection) obj;
 
-            Collection objList = (Collection) obj;
+            if (isValidElementCount(collObj.size())) {
+                ret = new ArrayList<>(collObj.size());
 
-            if (!isValidElementCount(objList.size())) {
-                return null;
-            }
+                for (Object element : collObj) {
+                    if (element != null) {
+                        Object normalizedValue = elementType.getNormalizedValue(element);
 
-            for (Object element : objList) {
-                if (element != null) {
-                    Object normalizedValue = elementType.getNormalizedValue(element);
+                        if (normalizedValue != null) {
+                            ret.add(normalizedValue);
+                        } else {
+                            ret = null; // invalid element value
 
-                    if (normalizedValue != null) {
-                        ret.add(normalizedValue);
+                            break;
+                        }
                     } else {
-                        return null; // invalid element value
+                        ret.add(element);
                     }
-                } else {
-                    ret.add(element);
                 }
             }
-
-            return ret;
-        } else if (obj.getClass().isArray()) {
-            List<Object> ret = new ArrayList<>();
-
+        } else if (obj != null && obj.getClass().isArray()) {
             int arrayLen = Array.getLength(obj);
 
-            if (!isValidElementCount(arrayLen)) {
-                return null;
-            }
+            if (isValidElementCount(arrayLen)) {
+                ret = new ArrayList<>(arrayLen);
 
-            for (int i = 0; i < arrayLen; i++) {
-                Object element = Array.get(obj, i);
+                for (int i = 0; i < arrayLen; i++) {
+                    Object element = Array.get(obj, i);
 
-                if (element != null) {
-                    Object normalizedValue = elementType.getNormalizedValue(element);
+                    if (element != null) {
+                        Object normalizedValue = elementType.getNormalizedValue(element);
+
+                        if (normalizedValue != null) {
+                            ret.add(normalizedValue);
+                        } else {
+                            ret = null; // invalid element value
 
-                    if (normalizedValue != null) {
-                        ret.add(normalizedValue);
+                            break;
+                        }
                     } else {
-                        return null; // invalid element value
+                        ret.add(element);
                     }
-                } else {
-                    ret.add(element);
                 }
             }
-
-            return ret;
         }
 
-        return null;
+        return ret;
     }
 
     @Override
     public Collection<?> getNormalizedValueForUpdate(Object obj) {
-        if (obj == null) {
-            return null;
+        Collection<Object> ret = null;
+
+        if (obj instanceof String) {
+            obj = AtlasType.fromJson(obj.toString(), List.class);
         }
 
         if (obj instanceof List || obj instanceof Set) {
-            List<Object> ret = new ArrayList<>();
-
             Collection objList = (Collection) obj;
 
-            if (!isValidElementCount(objList.size())) {
-                return null;
-            }
+            if (isValidElementCount(objList.size())) {
+                ret = new ArrayList<>(objList.size());
 
-            for (Object element : objList) {
-                if (element != null) {
-                    Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
+                for (Object element : objList) {
+                    if (element != null) {
+                        Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
 
-                    if (normalizedValue != null) {
-                        ret.add(normalizedValue);
+                        if (normalizedValue != null) {
+                            ret.add(normalizedValue);
+                        } else {
+                            ret = null; // invalid element value
+
+                            break;
+                        }
                     } else {
-                        return null; // invalid element value
+                        ret.add(element);
                     }
-                } else {
-                    ret.add(element);
                 }
             }
-
-            return ret;
-        } else if (obj.getClass().isArray()) {
-            List<Object> ret = new ArrayList<>();
-
+        } else if (obj != null && obj.getClass().isArray()) {
             int arrayLen = Array.getLength(obj);
 
-            if (!isValidElementCount(arrayLen)) {
-                return null;
-            }
+            if (isValidElementCount(arrayLen)) {
+                ret = new ArrayList<>(arrayLen);
 
-            for (int i = 0; i < arrayLen; i++) {
-                Object element = Array.get(obj, i);
+                for (int i = 0; i < arrayLen; i++) {
+                    Object element = Array.get(obj, i);
 
-                if (element != null) {
-                    Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
+                    if (element != null) {
+                        Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
+
+                        if (normalizedValue != null) {
+                            ret.add(normalizedValue);
+                        } else {
+                            ret = null; // invalid element value
 
-                    if (normalizedValue != null) {
-                        ret.add(normalizedValue);
+                            break;
+                        }
                     } else {
-                        return null; // invalid element value
+                        ret.add(element);
                     }
-                } else {
-                    ret.add(element);
                 }
             }
-
-            return ret;
         }
 
-        return null;
+        return ret;
     }
 
     @Override
@@ -483,7 +417,7 @@ public class AtlasArrayType extends AtlasType {
         if (elementAttributeType == elementType) {
             return this;
         } else {
-            AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount);
+            AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount, cardinality);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("getTypeForAttribute(): {} ==> {}", getTypeName(), attributeType.getTypeName());
@@ -523,29 +457,131 @@ public class AtlasArrayType extends AtlasType {
         return false;
     }
 
+    private boolean areEqualSets(Object val1, Object val2, Map<String, String> guidAssignments) {
+        boolean ret  = true;
+
+        if (val1 == null) {
+            ret = val2 == null;
+        } else if (val2 == null) {
+            ret = false;
+        } else if (val1 == val2) {
+            ret = true;
+        } else {
+            Set set1 = getSetFromValue(val1);
+            Set set2 = getSetFromValue(val2);
+
+            if (set1.size() != set2.size()) {
+                ret = false;
+            } else {
+                for (Object elem1 : set1) {
+                    boolean foundInSet2 = false;
+
+                    for (Object elem2 : set2) {
+                        if (elementType.areEqualValues(elem1, elem2, guidAssignments)) {
+                            foundInSet2 = true;
+
+                            break;
+                        }
+                    }
+
+                    if (!foundInSet2) {
+                        ret = false;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private boolean areEqualLists(Object val1, Object val2, Map<String, String> guidAssignments) {
+        boolean ret = true;
+
+        if (val1 == null) {
+            ret = val2 == null;
+        } else if (val2 == null) {
+            ret = false;
+        } else if (val1 == val2) {
+            ret = true;
+        } else if (val1.getClass().isArray() && val2.getClass().isArray()) {
+            int len = Array.getLength(val1);
+
+            if (len != Array.getLength(val2)) {
+                ret = false;
+            } else {
+                for (int i = 0; i < len; i++) {
+                    if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) {
+                        ret = false;
+
+                        break;
+                    }
+                }
+            }
+        } else {
+            List list1 = getListFromValue(val1);
+            List list2 = getListFromValue(val2);
+
+            if (list1.size() != list2.size()) {
+                ret = false;
+            } else {
+                int len = list1.size();
+
+                for (int i = 0; i < len; i++) {
+                    if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) {
+                        ret = false;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        return ret;
+    }
+
     private List getListFromValue(Object val) {
         final List ret;
 
         if (val instanceof List) {
             ret = (List) val;
         } else if (val instanceof Collection) {
-            int len = ((Collection) val).size();
+            ret = new ArrayList<>((Collection) val);
+        } else if (val.getClass().isArray()) {
+            int len = Array.getLength(val);
 
             ret = new ArrayList<>(len);
 
-            for (Object elem : ((Collection) val)) {
-                ret.add(elem);
+            for (int i = 0; i < len; i++) {
+                ret.add(Array.get(val, i));
             }
+        } else if (val instanceof String){
+            ret = AtlasType.fromJson(val.toString(), List.class);
+        } else {
+            ret = null;
+        }
+
+        return ret;
+    }
+
+    private Set getSetFromValue(Object val) {
+        final Set ret;
+
+        if (val instanceof Set) {
+            ret = (Set) val;
+        } else if (val instanceof Collection) {
+            ret = new HashSet<>((Collection) val);
         } else if (val.getClass().isArray()) {
             int len = Array.getLength(val);
 
-            ret = new ArrayList<>(len);
+            ret = new HashSet<>(len);
 
             for (int i = 0; i < len; i++) {
                 ret.add(Array.get(val, i));
             }
         } else if (val instanceof String){
-            ret = AtlasType.fromJson(val.toString(), List.class);
+            ret = AtlasType.fromJson(val.toString(), Set.class);
         } else {
             ret = null;
         }
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 1ce776e..d9ae9e3 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -22,7 +22,6 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
@@ -544,7 +543,9 @@ public class AtlasEntityType extends AtlasStructType {
                 superType.normalizeAttributeValues(ent);
             }
 
-            normalizeValues(ent);
+            super.normalizeAttributeValues(ent);
+
+            normalizeRelationshipAttributeValues(ent, false);
         }
     }
 
@@ -555,6 +556,8 @@ public class AtlasEntityType extends AtlasStructType {
             }
 
             super.normalizeAttributeValuesForUpdate(ent);
+
+            normalizeRelationshipAttributeValues(ent, true);
         }
     }
 
@@ -565,7 +568,9 @@ public class AtlasEntityType extends AtlasStructType {
                 superType.normalizeAttributeValues(obj);
             }
 
-            normalizeValues(obj);
+            super.normalizeAttributeValues(obj);
+
+            normalizeRelationshipAttributeValues(obj, false);
         }
     }
 
@@ -576,6 +581,8 @@ public class AtlasEntityType extends AtlasStructType {
             }
 
             super.normalizeAttributeValuesForUpdate(obj);
+
+            normalizeRelationshipAttributeValues(obj, true);
         }
     }
 
@@ -743,67 +750,56 @@ public class AtlasEntityType extends AtlasStructType {
         return ret;
     }
 
-    private void normalizeRelationshipAttributeValues(AtlasStruct obj) {
-        if (obj != null && obj instanceof AtlasEntity) {
-            AtlasEntity entityObj = (AtlasEntity) obj;
-
+    private void normalizeRelationshipAttributeValues(AtlasEntity entity, boolean isUpdate) {
+        if (entity != null) {
             for (String attributeName : relationshipAttributes.keySet()) {
-                if (entityObj.hasRelationshipAttribute(attributeName)) {
-                    Object            attributeValue   = entityObj.getRelationshipAttribute(attributeName);
-                    String            relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
-                    AtlasAttribute    attribute        = getRelationshipAttribute(attributeName, relationshipType);
-                    AtlasAttributeDef attributeDef     = attribute.getAttributeDef();
+                if (entity.hasRelationshipAttribute(attributeName)) {
+                    Object         attributeValue   = entity.getRelationshipAttribute(attributeName);
+                    String         relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
+                    AtlasAttribute attribute        = getRelationshipAttribute(attributeName, relationshipType);
 
-                    attributeValue = getNormalizedValue(attributeValue, attributeDef);
+                    if (attribute != null) {
+                        AtlasType attrType = attribute.getAttributeType();
+
+                        if (isValidRelationshipType(attrType)) {
+                            if (isUpdate) {
+                                attributeValue = attrType.getNormalizedValueForUpdate(attributeValue);
+                            } else {
+                                attributeValue = attrType.getNormalizedValue(attributeValue);
+                            }
 
-                    entityObj.setRelationshipAttribute(attributeName, attributeValue);
+                            entity.setRelationshipAttribute(attributeName, attributeValue);
+                        }
+                    }
                 }
             }
         }
     }
 
-    public void normalizeRelationshipAttributeValues(Map<String, Object> obj) {
+    public void normalizeRelationshipAttributeValues(Map<String, Object> obj, boolean isUpdate) {
         if (obj != null) {
             for (String attributeName : relationshipAttributes.keySet()) {
                 if (obj.containsKey(attributeName)) {
-                    Object            attributeValue   = obj.get(attributeName);
-                    String            relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
-                    AtlasAttribute    attribute        = getRelationshipAttribute(attributeName, relationshipType);
-                    AtlasAttributeDef attributeDef     = attribute.getAttributeDef();
-
-                    attributeValue = getNormalizedValue(attributeValue, attributeDef);
-
-                    obj.put(attributeName, attributeValue);
-                }
-            }
-        }
-    }
+                    Object         attributeValue   = obj.get(attributeName);
+                    String         relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
+                    AtlasAttribute attribute        = getRelationshipAttribute(attributeName, relationshipType);
 
-    private Object getNormalizedValue(Object value, AtlasAttributeDef attributeDef) {
-        String         relationshipType = AtlasEntityUtil.getRelationshipType(value);
-        AtlasAttribute attribute        = getRelationshipAttribute(attributeDef.getName(), relationshipType);
+                    if (attribute != null) {
+                        AtlasType attrType = attribute.getAttributeType();
 
-        if (attribute != null) {
-            AtlasType attrType = attribute.getAttributeType();
+                        if (isValidRelationshipType(attrType)) {
+                            if (isUpdate) {
+                                attributeValue = attrType.getNormalizedValueForUpdate(attributeValue);
+                            } else {
+                                attributeValue = attrType.getNormalizedValue(attributeValue);
+                            }
 
-            if (isValidRelationshipType(attrType) && value != null) {
-                return attrType.getNormalizedValue(value);
+                            obj.put(attributeName, attributeValue);
+                        }
+                    }
+                }
             }
         }
-
-        return null;
-    }
-
-    private void normalizeValues(AtlasEntity ent) {
-        super.normalizeAttributeValues(ent);
-
-        normalizeRelationshipAttributeValues(ent);
-    }
-
-    private void normalizeValues(Map<String, Object> obj) {
-        super.normalizeAttributeValues(obj);
-
-        normalizeRelationshipAttributeValues(obj);
     }
 
     private boolean validateRelationshipAttributes(Object obj, String objName, List<String> messages) {
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index 585d176..98071b2 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -346,8 +346,15 @@ public class AtlasRelationshipType extends AtlasStructType {
                 attributeDef.addConstraint(constraint);
             }
 
-            attribute = new AtlasAttribute(entityType, attributeDef,
-                                           typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel);
+            AtlasType attrType = typeRegistry.getType(attrTypeName);
+
+            if (attrType instanceof AtlasArrayType) {
+                AtlasArrayType arrayType = (AtlasArrayType) attrType;
+
+                arrayType.setCardinality(attributeDef.getCardinality());
+            }
+
+            attribute = new AtlasAttribute(entityType, attributeDef, attrType, getTypeName(), relationshipLabel);
 
             attribute.setLegacyAttribute(endDef.getIsLegacyAttribute());
         } else {
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index fb24df0..31953bd 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -99,6 +99,7 @@ public class AtlasStructType extends AtlasType {
 
                 arrayType.setMinCount(attributeDef.getValuesMinCount());
                 arrayType.setMaxCount(attributeDef.getValuesMaxCount());
+                arrayType.setCardinality(cardinality);
             }
 
             //check if attribute type is not classification
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index 079a8fc..d74c7e3 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
 import org.apache.atlas.model.typedef.AtlasTypeDefHeader;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.v1.model.typedef.AttributeDefinition;
 import org.apache.atlas.v1.model.typedef.ClassTypeDefinition;
 import org.apache.atlas.v1.model.typedef.Multiplicity;
@@ -413,10 +414,36 @@ public class AtlasTypeUtil {
         return new AtlasRelatedObjectId(getAtlasObjectId(entity));
     }
 
+    public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
+        return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry));
+    }
+
     public static AtlasObjectId getAtlasObjectId(AtlasEntity entity) {
         return new AtlasObjectId(entity.getGuid(), entity.getTypeName());
     }
 
+    public static AtlasObjectId getAtlasObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
+        String              typeName       = entity.getTypeName();
+        AtlasEntityType     entityType     = typeRegistry.getEntityTypeByName(typeName);
+        Map<String, Object> uniqAttributes = null;
+
+        if (entityType != null && MapUtils.isNotEmpty(entityType.getUniqAttributes())) {
+            for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
+                Object attrValue = entity.getAttribute(attribute.getName());
+
+                if (attrValue != null) {
+                    if (uniqAttributes == null) {
+                        uniqAttributes = new HashMap<>();
+                    }
+
+                    uniqAttributes.put(attribute.getName(), attrValue);
+                }
+            }
+        }
+
+        return new AtlasObjectId(entity.getGuid(), typeName, uniqAttributes);
+    }
+
     public static AtlasObjectId getAtlasObjectId(AtlasEntityHeader header) {
         return new AtlasObjectId(header.getGuid(), header.getTypeName());
     }
diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java
index a37dd46..f117fb3 100644
--- a/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java
+++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java
@@ -63,12 +63,12 @@ public class TestAtlasStructType {
 
         multiValuedAttribMin.setName(MULTI_VAL_ATTR_NAME_MIN);
         multiValuedAttribMin.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT));
-        multiValuedAttribMin.setCardinality(Cardinality.SET);
+        multiValuedAttribMin.setCardinality(Cardinality.LIST);
         multiValuedAttribMin.setValuesMinCount(MULTI_VAL_ATTR_MIN_COUNT);
 
         multiValuedAttribMax.setName(MULTI_VAL_ATTR_NAME_MAX);
         multiValuedAttribMax.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT));
-        multiValuedAttribMax.setCardinality(Cardinality.LIST);
+        multiValuedAttribMax.setCardinality(Cardinality.SET);
         multiValuedAttribMax.setValuesMaxCount(MULTI_VAL_ATTR_MAX_COUNT);
 
         AtlasStructDef structDef = ModelTestUtil.newStructDef();
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
index 173fcee..ae92b8b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
@@ -190,7 +190,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
 
                             if (entities != null) {
                                 v2Value  = entities;
-                                attrType = new AtlasArrayType(entityType);
+                                attrType = new AtlasArrayType(entityType, arrayType.getMinCount(), arrayType.getMaxCount(), arrayType.getCardinality());
 
                                 if (LOG.isDebugEnabled()) {
                                     LOG.debug("{}: replaced objIdList with entityList", attr.getQualifiedName());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
index d12b036..c823b20 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 
 import java.util.Iterator;
+import java.util.List;
 
 public class AtlasEntityStream implements EntityStream {
     protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
@@ -33,6 +34,10 @@ public class AtlasEntityStream implements EntityStream {
         this(new AtlasEntitiesWithExtInfo(entity), null);
     }
 
+    public AtlasEntityStream(List<AtlasEntity> entities) {
+        this(new AtlasEntitiesWithExtInfo(entities), null);
+    }
+
     public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
         this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), null);
     }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 48355c9..8430fd4 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -34,6 +34,7 @@ import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.notification.HookNotification;
@@ -79,7 +80,6 @@ import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -492,7 +492,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     return;
                 }
 
-                preProcessNotificationMessage(kafkaMsg);
+                PreprocessorContext context = preProcessNotificationMessage(kafkaMsg);
 
                 if (isEmptyMessage(kafkaMsg)) {
                     commit(kafkaMsg);
@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                                             AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false);
+                                createOrUpdate(entities, false, context);
                             }
                             break;
 
@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                 // There should only be one root entity
                                 entities.getEntities().get(0).setGuid(guid);
 
-                                createOrUpdate(entities, true);
+                                createOrUpdate(entities, true, context);
                             }
                             break;
 
@@ -579,7 +579,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                                             AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false);
+                                createOrUpdate(entities, false, context);
                             }
                             break;
 
@@ -593,7 +593,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                                             AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false);
+                                createOrUpdate(entities, false, context);
                             }
                             break;
 
@@ -622,7 +622,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                                             AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false);
+                                createOrUpdate(entities, false, context);
                             }
                             break;
 
@@ -708,15 +708,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             }
         }
 
-        private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate) throws AtlasBaseException {
+        private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, PreprocessorContext context) throws AtlasBaseException {
             List<AtlasEntity> entitiesList = entities.getEntities();
             AtlasEntityStream entityStream = new AtlasEntityStream(entities);
 
             if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
-                atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
-            } else {
-                Map<String, String> guidAssignments = new HashMap<>();
+                EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
 
+                recordProcessedEntities(response, context);
+            } else {
                 for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
                     int toIndex = fromIdx + commitBatchSize;
 
@@ -726,20 +726,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                     List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex));
 
-                    updateProcessedEntityReferences(entitiesBatch, guidAssignments);
+                    updateProcessedEntityReferences(entitiesBatch, context.getGuidAssignments());
 
                     AtlasEntitiesWithExtInfo batch       = new AtlasEntitiesWithExtInfo(entitiesBatch);
                     AtlasEntityStream        batchStream = new AtlasEntityStream(batch, entityStream);
 
                     EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
 
-                    recordProcessedEntities(response, guidAssignments);
+                    recordProcessedEntities(response, context);
 
                     RequestContext.get().resetEntityGuidUpdates();
 
                     RequestContext.get().clearCache();
                 }
             }
+
+            if (context != null) {
+                context.prepareForPostUpdate();
+
+                List<AtlasEntity> postUpdateEntities = context.getPostUpdateEntities();
+
+                if (CollectionUtils.isNotEmpty(postUpdateEntities)) {
+                    atlasEntityStore.createOrUpdate(new AtlasEntityStream(postUpdateEntities), true);
+                }
+            }
         }
 
         private void recordFailedMessages() {
@@ -815,49 +825,51 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
     }
 
-    private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
-        if (!preprocessEnabled) {
-            return;
-        }
+    private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
+        PreprocessorContext context = null;
 
-        PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
+        if (preprocessEnabled) {
+            context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
 
-        if (context.isHivePreprocessEnabled()) {
-            preprocessHiveTypes(context);
-        }
+            if (context.isHivePreprocessEnabled()) {
+                preprocessHiveTypes(context);
+            }
 
-        if (skipHiveColumnLineageHive20633) {
-            skipHiveColumnLineage(context);
-        }
+            if (skipHiveColumnLineageHive20633) {
+                skipHiveColumnLineage(context);
+            }
 
-        if (rdbmsTypesRemoveOwnedRefAttrs) {
-            rdbmsTypeRemoveOwnedRefAttrs(context);
-        }
+            if (rdbmsTypesRemoveOwnedRefAttrs) {
+                rdbmsTypeRemoveOwnedRefAttrs(context);
+            }
 
-        context.moveRegisteredReferredEntities();
+            context.moveRegisteredReferredEntities();
 
-        if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
-            // move hive_process and hive_column_lineage entities to end of the list
-            List<AtlasEntity> entities = context.getEntities();
-            int               count    = entities.size();
+            if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
+                // move hive_process and hive_column_lineage entities to end of the list
+                List<AtlasEntity> entities = context.getEntities();
+                int               count    = entities.size();
 
-            for (int i = 0; i < count; i++) {
-                AtlasEntity entity = entities.get(i);
+                for (int i = 0; i < count; i++) {
+                    AtlasEntity entity = entities.get(i);
 
-                switch (entity.getTypeName()) {
-                    case TYPE_HIVE_PROCESS:
-                    case TYPE_HIVE_COLUMN_LINEAGE:
-                        entities.remove(i--);
-                        entities.add(entity);
-                        count--;
-                    break;
+                    switch (entity.getTypeName()) {
+                        case TYPE_HIVE_PROCESS:
+                        case TYPE_HIVE_COLUMN_LINEAGE:
+                            entities.remove(i--);
+                            entities.add(entity);
+                            count--;
+                            break;
+                    }
                 }
-            }
 
-            if (entities.size() - count > 0) {
-                LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
+                if (entities.size() - count > 0) {
+                    LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
+                }
             }
         }
+
+        return context;
     }
 
     private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
@@ -1009,12 +1021,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         return ret;
     }
 
-    private void recordProcessedEntities(EntityMutationResponse mutationResponse, Map<String, String> guidAssignments) {
-        if (mutationResponse != null && MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
-            guidAssignments.putAll(mutationResponse.getGuidAssignments());
+    private void recordProcessedEntities(EntityMutationResponse mutationResponse, PreprocessorContext context) {
+        if (mutationResponse != null && context != null) {
+            if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
+                context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
+            }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("recordProcessedEntities: added {} guidAssignments. updatedSize={}", mutationResponse.getGuidAssignments().size(), guidAssignments.size());
+            if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
+                for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
+                    if (entity != null && entity.getGuid() != null) {
+                        context.getCreatedEntities().add(entity.getGuid());
+                    }
+                }
+            }
+
+            if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
+                for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
+                    if (entity != null && entity.getGuid() != null) {
+                        context.getDeletedEntities().add(entity.getGuid());
+                    }
+                }
             }
         }
     }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 085e746..9b620dd 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -53,6 +53,7 @@ public abstract class EntityPreprocessor {
     public static final String ATTRIBUTE_TABLES         = "tables";
     public static final String ATTRIBUTE_INDEXES        = "indexes";
     public static final String ATTRIBUTE_FOREIGN_KEYS   = "foreign_keys";
+    public static final String ATTRIBUTE_INSTANCE       = "instance";
 
     public static final char   QNAME_SEP_CLUSTER_NAME = '@';
     public static final char   QNAME_SEP_ENTITY_NAME  = '.';
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 0b93658..cc31032 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -18,8 +18,6 @@
 package org.apache.atlas.notification.preprocessor;
 
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -27,16 +25,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 public class HivePreprocessor {
     private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
 
     private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS        = "hive_table_columns";
     private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
+    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC    = "hive_table_storagedesc";
 
     static class HiveTablePreprocessor extends EntityPreprocessor {
         public HiveTablePreprocessor() {
@@ -67,63 +63,12 @@ public class HivePreprocessor {
                     entity.setAttribute(ATTRIBUTE_COLUMNS, null);
                     entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
                 } else if (context.getHiveTypesRemoveOwnedRefAttrs()) {
-                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD);
-
-                    removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context);
-                    removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD, RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC, ATTRIBUTE_TABLE);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, ATTRIBUTE_TABLE);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, ATTRIBUTE_TABLE);
                 }
             }
         }
-
-        private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
-            Object attrVal = entity.removeAttribute(attrName);
-
-            if (attrVal != null) {
-                Set<String> guids = new HashSet<>();
-
-                context.collectGuids(attrVal, guids);
-
-                for (String guid : guids) {
-                    AtlasEntity colEntity = context.getEntity(guid);
-
-                    if (colEntity != null) {
-                        Object attrTable = null;
-
-                        if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) {
-                            attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE);
-                        } else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) {
-                            attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE);
-                        }
-
-                        attrTable = setRelationshipType(attrTable, relationshipType);
-
-                        if (attrTable != null) {
-                            colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable);
-                        }
-
-                        context.addToReferredEntitiesToMove(guid);
-                    }
-                }
-            }
-        }
-
-        private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
-            AtlasRelatedObjectId ret = null;
-
-            if (attr instanceof AtlasRelatedObjectId) {
-                ret = (AtlasRelatedObjectId) attr;
-            } else if (attr instanceof AtlasObjectId) {
-                ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
-            } else if (attr instanceof Map) {
-                ret = new AtlasRelatedObjectId((Map) attr);
-            }
-
-            if (ret != null) {
-                ret.setRelationshipType(relationshipType);
-            }
-
-            return ret;
-        }
     }
 
 
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index c85c1b8..2db0574 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -21,18 +21,27 @@ import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
+
 
 public class PreprocessorContext {
     private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class);
@@ -40,6 +49,7 @@ public class PreprocessorContext {
     public enum PreprocessAction { NONE, IGNORE, PRUNE }
 
     private final AtlasKafkaMessage<HookNotification> kafkaMessage;
+    private final AtlasTypeRegistry                   typeRegistry;
     private final AtlasEntitiesWithExtInfo            entitiesWithExtInfo;
     private final List<Pattern>                       hiveTablesToIgnore;
     private final List<Pattern>                       hiveTablesToPrune;
@@ -49,9 +59,14 @@ public class PreprocessorContext {
     private final Set<String>                         ignoredEntities        = new HashSet<>();
     private final Set<String>                         prunedEntities         = new HashSet<>();
     private final Set<String>                         referredEntitiesToMove = new HashSet<>();
+    private final Set<String>                         createdEntities        = new HashSet<>();
+    private final Set<String>                         deletedEntities        = new HashSet<>();
+    private final Map<String, String>                 guidAssignments        = new HashMap<>();
+    private       List<AtlasEntity>                   postUpdateEntities     = null;
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
         this.kafkaMessage                  = kafkaMessage;
+        this.typeRegistry                  = typeRegistry;
         this.hiveTablesToIgnore            = hiveTablesToIgnore;
         this.hiveTablesToPrune             = hiveTablesToPrune;
         this.hiveTablesCache               = hiveTablesCache;
@@ -119,6 +134,14 @@ public class PreprocessorContext {
 
     public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; }
 
+    public Set<String> getCreatedEntities() { return createdEntities; }
+
+    public Set<String> getDeletedEntities() { return deletedEntities; }
+
+    public Map<String, String> getGuidAssignments() { return guidAssignments; }
+
+    public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; }
+
     public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
         PreprocessAction ret = PreprocessAction.NONE;
 
@@ -199,12 +222,48 @@ public class PreprocessorContext {
         collectGuids(obj, prunedEntities);
     }
 
-    public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName) {
-        Set<String> guidsToMove = new HashSet<>();
+    public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, String refAttrName) {
+        Object attrVal = entity.removeAttribute(attrName);
+
+        if (attrVal != null) {
+            AtlasRelatedObjectId entityId = null;
+            Set<String>          guids    = new HashSet<>();
+
+            collectGuids(attrVal, guids);
+
+            // removed attrVal might have elements removed (e.g. removed column); to handle this case register the entity for partial update
+            addToPostUpdate(entity, attrName, attrVal);
+
+            for (String guid : guids) {
+                AtlasEntity refEntity = getEntity(guid);
+
+                if (refEntity != null) {
+                    Object refAttr = null;
+
+                    if (refEntity.hasRelationshipAttribute(refAttrName)) {
+                        refAttr = refEntity.getRelationshipAttribute(refAttrName);
+                    } else if (refEntity.hasAttribute(refAttrName)) {
+                        refAttr = refEntity.getAttribute(refAttrName);
+                    } else {
+                        if (entityId == null) {
+                            entityId = AtlasTypeUtil.toAtlasRelatedObjectId(entity, typeRegistry);
+                        }
+
+                        refAttr = entityId;
+                    }
+
+                    if (refAttr != null) {
+                        refAttr = setRelationshipType(refAttr, relationshipType);
+                    }
 
-        collectGuids(entity.removeAttribute(attrName), guidsToMove);
+                    if (refAttr != null) {
+                        refEntity.setRelationshipAttribute(refAttrName, refAttr);
+                    }
 
-        addToReferredEntitiesToMove(guidsToMove);
+                    addToReferredEntitiesToMove(guid);
+                }
+            }
+        }
     }
 
     public void moveRegisteredReferredEntities() {
@@ -236,6 +295,32 @@ public class PreprocessorContext {
         }
     }
 
+    public void prepareForPostUpdate() {
+        if (postUpdateEntities != null) {
+            ListIterator<AtlasEntity> iter = postUpdateEntities.listIterator();
+
+            while (iter.hasNext()) {
+                AtlasEntity entity       = iter.next();
+                String      assignedGuid = getAssignedGuid(entity.getGuid());
+
+                // no need to perform partial-update for entities that are created/deleted while processing this message
+                if (createdEntities.contains(assignedGuid) || deletedEntities.contains(assignedGuid)) {
+                    iter.remove();
+                } else {
+                    entity.setGuid(assignedGuid);
+
+                    if (entity.getAttributes() != null) {
+                        setAssignedGuids(entity.getAttributes().values());
+                    }
+
+                    if (entity.getRelationshipAttributes() != null) {
+                        setAssignedGuids(entity.getRelationshipAttributes().values());
+                    }
+                }
+            }
+        }
+    }
+
     public String getTypeName(Object obj) {
         Object ret = null;
 
@@ -258,7 +343,7 @@ public class PreprocessorContext {
         if (obj instanceof AtlasObjectId) {
             ret = ((AtlasObjectId) obj).getGuid();
         } else if (obj instanceof Map) {
-            ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+            ret = ((Map) obj).get(KEY_GUID);
         } else if (obj instanceof AtlasEntity) {
             ret = ((AtlasEntity) obj).getGuid();
         } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
@@ -274,7 +359,7 @@ public class PreprocessorContext {
                 Collection objList = (Collection) obj;
 
                 for (Object objElem : objList) {
-                    collectGuid(objElem, guids);
+                    collectGuids(objElem, guids);
                 }
             } else {
                 collectGuid(obj, guids);
@@ -304,4 +389,91 @@ public class PreprocessorContext {
 
         return ret;
     }
+
+    private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
+        AtlasRelatedObjectId ret = null;
+
+        if (attr instanceof AtlasRelatedObjectId) {
+            ret = (AtlasRelatedObjectId) attr;
+        } else if (attr instanceof AtlasObjectId) {
+            ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
+        } else if (attr instanceof Map) {
+            ret = new AtlasRelatedObjectId((Map) attr);
+        }
+
+        if (ret != null) {
+            ret.setRelationshipType(relationshipType);
+        }
+
+        return ret;
+    }
+
+    private String getAssignedGuid(String guid) {
+        String ret = guidAssignments.get(guid);
+
+        return ret != null ? ret : guid;
+    }
+
+    private void setAssignedGuids(Object obj) {
+        if (obj != null) {
+            if (obj instanceof Collection) {
+                Collection objList = (Collection) obj;
+
+                for (Object objElem : objList) {
+                    setAssignedGuids(objElem);
+                }
+            } else {
+                setAssignedGuid(obj);
+            }
+        }
+    }
+
+    private void setAssignedGuid(Object obj) {
+        if (obj instanceof AtlasRelatedObjectId) {
+            AtlasRelatedObjectId objId = (AtlasRelatedObjectId) obj;
+
+            objId.setGuid(getAssignedGuid(objId.getGuid()));
+        } else if (obj instanceof AtlasObjectId) {
+            AtlasObjectId objId = (AtlasObjectId) obj;
+
+            objId.setGuid(getAssignedGuid(objId.getGuid()));
+        } else if (obj instanceof Map) {
+            Map    objId = (Map) obj;
+            Object guid  = objId.get(KEY_GUID);
+
+            if (guid != null) {
+                objId.put(KEY_GUID, getAssignedGuid(guid.toString()));
+            }
+        }
+    }
+
+    private void addToPostUpdate(AtlasEntity entity, String attrName, Object attrVal) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("addToPostUpdate(guid={}, entityType={}, attrName={}", entity.getGuid(), entity.getTypeName(), attrName);
+        }
+
+        AtlasEntity partialEntity = null;
+
+        if (postUpdateEntities == null) {
+            postUpdateEntities = new ArrayList<>();
+        }
+
+        for (AtlasEntity existing : postUpdateEntities) {
+            if (StringUtils.equals(entity.getGuid(), existing.getGuid())) {
+                partialEntity = existing;
+
+                break;
+            }
+        }
+
+        if (partialEntity == null) {
+            partialEntity = new AtlasEntity(entity.getTypeName(), attrName, attrVal);
+
+            partialEntity.setGuid(entity.getGuid());
+
+            postUpdateEntities.add(partialEntity);
+        } else {
+            partialEntity.setAttribute(attrName, attrVal);
+        }
+    }
 }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
index adc1983..7dcfa2f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
@@ -32,6 +32,12 @@ import java.util.Set;
 public class RdbmsPreprocessor {
     private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class);
 
+    private static final String RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES = "rdbms_instance_databases";
+    private static final String RELATIONSHIP_TYPE_RDBMS_DB_TABLES          = "rdbms_db_tables";
+    private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS      = "rdbms_table_columns";
+    private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES      = "rdbms_table_indexes";
+    private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS = "rdbms_table_foreign_key";
+
     static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor {
         public RdbmsInstancePreprocessor() {
             super(TYPE_RDBMS_INSTANCE);
@@ -121,17 +127,17 @@ public class RdbmsPreprocessor {
         private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) {
             switch (entity.getTypeName()) {
                 case TYPE_RDBMS_INSTANCE:
-                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES, RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES, ATTRIBUTE_INSTANCE);
                 break;
 
                 case TYPE_RDBMS_DB:
-                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES, RELATIONSHIP_TYPE_RDBMS_DB_TABLES, ATTRIBUTE_DB);
                 break;
 
                 case TYPE_RDBMS_TABLE:
-                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS);
-                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES);
-                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS, ATTRIBUTE_TABLE);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES, RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES, ATTRIBUTE_TABLE);
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS, RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS, ATTRIBUTE_TABLE);
                 break;
             }
         }