You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2021/01/22 08:02:16 UTC

[atlas] branch branch-2.0 updated: ATLAS-4019: audit improvement to store only delta

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 33adcc8  ATLAS-4019: audit improvement to store only delta
33adcc8 is described below

commit 33adcc8810745ce4efb3b617b7b1fc2b952f255d
Author: Deep Singh <de...@gmail.com>
AuthorDate: Thu Jan 21 15:41:35 2021 -0800

    ATLAS-4019: audit improvement to store only delta
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 8d7ac77a74479263ce59b9e22bb3def91b8b9326)
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |   5 +
 .../test/java/org/apache/atlas/TestUtilsV2.java    |   1 +
 .../repository/audit/EntityAuditListenerV2.java    |  32 +++-
 .../store/graph/v2/AtlasEntityComparator.java      | 213 +++++++++++++++++++++
 .../store/graph/v2/AtlasEntityStoreV2.java         | 134 +++----------
 .../store/graph/v2/AtlasEntityComparatorTest.java  | 204 ++++++++++++++++++++
 .../store/graph/v2/AtlasEntityStoreV2Test.java     |  49 ++++-
 .../main/java/org/apache/atlas/RequestContext.java |  13 ++
 8 files changed, 540 insertions(+), 111 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 396aad0..08d6c9d 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -75,6 +75,7 @@ public enum AtlasConfiguration {
     HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
     STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
     REBUILD_INDEX("atlas.rebuild.index", false),
+    STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false),
     DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true);
 
     private static final Configuration APPLICATION_PROPERTIES;
@@ -95,6 +96,10 @@ public enum AtlasConfiguration {
         this.defaultValue = defaultValue;
     }
 
+    public String getPropertyName() {
+        return propertyName;
+    }
+
     public int getInt() {
         return APPLICATION_PROPERTIES.getInt(propertyName, Integer.valueOf(defaultValue.toString()).intValue());
     }
diff --git a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
index 2b9cf6e..80ab6b4 100755
--- a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
+++ b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
@@ -566,6 +566,7 @@ public final class TestUtilsV2 {
     public static final String SERDE_TYPE = "serdeType";
     public static final String COLUMNS_MAP = "columnsMap";
     public static final String COLUMNS_ATTR_NAME = "columns";
+    public static final String TABLE_ATTR_NAME = "table";
     public static final String ENTITY_TYPE_WITH_SIMPLE_ATTR = "entity_with_simple_attr";
     public static final String ENTITY_TYPE_WITH_NESTED_COLLECTION_ATTR = "entity_with_nested_collection_attr";
     public static final String ENTITY_TYPE_WITH_COMPLEX_COLLECTION_ATTR = "entity_with_complex_collection_attr";
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 0043f1c..0b7d000 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -47,11 +47,13 @@ import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
 import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.BUSINESS_ATTRIBUTE_UPDATE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
@@ -106,14 +108,30 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
 
     @Override
     public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
-        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
-
+        RequestContext                      reqContext    = RequestContext.get();
+        MetricRecorder                      metric        = reqContext.startMetricRecord("entityAudit");
         FixedBufferList<EntityAuditEventV2> updatedEvents = getAuditEventsList();
-        for (AtlasEntity entity : entities) {
-            EntityAuditActionV2 action = isImport ? ENTITY_IMPORT_UPDATE :
-                    RequestContext.get().checkIfEntityIsForCustomAttributeUpdate(entity.getGuid()) ? CUSTOM_ATTRIBUTE_UPDATE :
-                    RequestContext.get().checkIfEntityIsForBusinessAttributeUpdate(entity.getGuid()) ? BUSINESS_ATTRIBUTE_UPDATE :
-                            ENTITY_UPDATE;
+        Collection<AtlasEntity>             updatedEntites;
+
+        if (STORE_DIFFERENTIAL_AUDITS.getBoolean()) {
+            updatedEntites = reqContext.getDifferentialEntities();
+        } else {
+            updatedEntites = entities;
+        }
+
+        for (AtlasEntity entity : updatedEntites) {
+            final EntityAuditActionV2 action;
+
+            if (isImport) {
+                action = ENTITY_IMPORT_UPDATE;
+            } else if (reqContext.checkIfEntityIsForCustomAttributeUpdate(entity.getGuid())) {
+                action = CUSTOM_ATTRIBUTE_UPDATE;
+            } else if (reqContext.checkIfEntityIsForBusinessAttributeUpdate(entity.getGuid())) {
+                action = BUSINESS_ATTRIBUTE_UPDATE;
+            } else {
+                action = ENTITY_UPDATE;
+            }
+
             createEvent(updatedEvents.next(), entity, action);
         }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java
new file mode 100644
index 0000000..d962385
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasEntityUtil;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
+
+public class AtlasEntityComparator {
+    private final AtlasTypeRegistry    typeRegistry;
+    private final EntityGraphRetriever entityRetriever;
+    private final Map<String, String>  guidRefMap;
+    private final boolean              skipClassificationCompare;
+    private final boolean              skipBusinessAttributeCompare;
+
+    public AtlasEntityComparator(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, Map<String, String> guidRefMap,
+                                 boolean skipClassificationCompare, boolean skipBusinessAttributeCompare) {
+        this.typeRegistry                 = typeRegistry;
+        this.entityRetriever              = entityRetriever;
+        this.guidRefMap                   = guidRefMap;
+        this.skipClassificationCompare    = skipClassificationCompare;
+        this.skipBusinessAttributeCompare = skipBusinessAttributeCompare;
+    }
+
+    public AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasEntity storedEntity, boolean findOnlyFirstDiff) throws AtlasBaseException {
+        return getDiffResult(updatedEntity, storedEntity, null, findOnlyFirstDiff);
+    }
+
+    public AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasVertex storedVertex, boolean findOnlyFirstDiff) throws AtlasBaseException {
+        return getDiffResult(updatedEntity, null, storedVertex, findOnlyFirstDiff);
+    }
+
+    private AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasEntity storedEntity, AtlasVertex storedVertex, boolean findOnlyFirstDiff) throws AtlasBaseException {
+        AtlasEntity                              diffEntity                       = new AtlasEntity(updatedEntity.getTypeName());
+        AtlasEntityType                          entityType                       = typeRegistry.getEntityTypeByName(updatedEntity.getTypeName());
+        Map<String, AtlasAttribute>              entityTypeAttributes             = entityType.getAllAttributes();
+        Map<String, Map<String, AtlasAttribute>> entityTypeRelationshipAttributes = entityType.getRelationshipAttributes();
+
+        int     sectionsWithDiff                = 0;
+        boolean hasDiffInAttributes             = false;
+        boolean hasDiffInRelationshipAttributes = false;
+        boolean hasDiffInCustomAttributes       = false;
+        boolean hasDiffInBusinessAttributes     = false;
+
+        diffEntity.setGuid(updatedEntity.getGuid());
+
+        if (MapUtils.isNotEmpty(updatedEntity.getAttributes())) { // check for attribute value change
+            for (Map.Entry<String, Object> entry : updatedEntity.getAttributes().entrySet()) {
+                String         attrName  = entry.getKey();
+                AtlasAttribute attribute = entityTypeAttributes.get(attrName);
+
+                if (attribute == null) { // no such attribute
+                    continue;
+                }
+
+                Object newVal  = entry.getValue();
+                Object currVal = (storedEntity != null) ? storedEntity.getAttribute(attrName) : entityRetriever.getEntityAttribute(storedVertex, attribute);
+
+                if (!attribute.getAttributeType().areEqualValues(currVal, newVal, guidRefMap)) {
+                    hasDiffInAttributes = true;
+
+                    diffEntity.setAttribute(attrName, newVal);
+
+                    if (findOnlyFirstDiff) {
+                        return new AtlasEntityDiffResult(diffEntity, true, false, false);
+                    }
+                }
+            }
+
+            if (hasDiffInAttributes) {
+                sectionsWithDiff++;
+            }
+        }
+
+        if (MapUtils.isNotEmpty(updatedEntity.getRelationshipAttributes())) { // check for relationship-attribute value change
+            for (Map.Entry<String, Object> entry : updatedEntity.getRelationshipAttributes().entrySet()) {
+                String attrName = entry.getKey();
+
+                if (!entityTypeRelationshipAttributes.containsKey(attrName)) {  // no such attribute
+                    continue;
+                }
+
+                Object         newVal           = entry.getValue();
+                String         relationshipType = AtlasEntityUtil.getRelationshipType(newVal);
+                AtlasAttribute attribute        = entityType.getRelationshipAttribute(attrName, relationshipType);
+                Object         currVal          = (storedEntity != null) ? storedEntity.getRelationshipAttribute(attrName) : entityRetriever.getEntityAttribute(storedVertex, attribute);
+
+                if (!attribute.getAttributeType().areEqualValues(currVal, newVal, guidRefMap)) {
+                    hasDiffInRelationshipAttributes = true;
+
+                    diffEntity.setRelationshipAttribute(attrName, newVal);
+
+                    if (findOnlyFirstDiff) {
+                        return new AtlasEntityDiffResult(diffEntity, true, false, false);
+                    }
+                }
+            }
+
+            if (hasDiffInRelationshipAttributes) {
+                sectionsWithDiff++;
+            }
+        }
+
+        if (!skipClassificationCompare) {
+            List<AtlasClassification> newVal  = updatedEntity.getClassifications();
+            List<AtlasClassification> currVal = (storedEntity != null) ? storedEntity.getClassifications() : entityRetriever.getAllClassifications(storedVertex);
+
+            if (!Objects.equals(currVal, newVal)) {
+                diffEntity.setClassifications(newVal);
+
+                sectionsWithDiff++;
+
+                if (findOnlyFirstDiff) {
+                    return new AtlasEntityDiffResult(diffEntity, true, false, false);
+                }
+            }
+        }
+
+        if (updatedEntity.getCustomAttributes() != null) {
+            // event coming from hook does not have custom attributes, such events must not remove existing attributes
+            // UI sends empty object in case of of intended removal.
+            Map<String, String> newCustomAttributes  = updatedEntity.getCustomAttributes();
+            Map<String, String> currCustomAttributes = (storedEntity != null) ? storedEntity.getCustomAttributes() : getCustomAttributes(storedVertex);
+
+            if (!Objects.equals(currCustomAttributes, newCustomAttributes)) {
+                diffEntity.setCustomAttributes(newCustomAttributes);
+
+                hasDiffInCustomAttributes = true;
+                sectionsWithDiff++;
+
+                if (findOnlyFirstDiff && sectionsWithDiff > 1) {
+                    return new AtlasEntityDiffResult(diffEntity, true, false, false);
+                }
+            }
+        }
+
+        if (!skipBusinessAttributeCompare) {
+            Map<String, Map<String, Object>> newBusinessMetadata  = updatedEntity.getBusinessAttributes();
+            Map<String, Map<String, Object>> currBusinessMetadata = (storedEntity != null) ? storedEntity.getBusinessAttributes() : entityRetriever.getBusinessMetadata(storedVertex);;
+
+            if (!Objects.equals(currBusinessMetadata, newBusinessMetadata)) {
+                diffEntity.setBusinessAttributes(newBusinessMetadata);
+
+                hasDiffInBusinessAttributes = true;
+                sectionsWithDiff++;
+
+                if (findOnlyFirstDiff && sectionsWithDiff > 1) {
+                    return new AtlasEntityDiffResult(diffEntity, true, false, false);
+                }
+            }
+        }
+
+        return new AtlasEntityDiffResult(diffEntity, sectionsWithDiff > 0, sectionsWithDiff == 1 && hasDiffInCustomAttributes, sectionsWithDiff == 1 && hasDiffInBusinessAttributes);
+    }
+
+    public static class AtlasEntityDiffResult {
+        private final AtlasEntity diffEntity;
+        private final boolean     hasDifference;
+        private final boolean     hasDifferenceOnlyInCustomAttributes;
+        private final boolean     hasDifferenceOnlyInBusinessAttributes;
+
+        AtlasEntityDiffResult(AtlasEntity diffEntity, boolean hasDifference, boolean hasDifferenceOnlyInCustomAttributes, boolean hasDifferenceOnlyInBusinessAttributes) {
+            this.diffEntity                            = diffEntity;
+            this.hasDifference                         = hasDifference;
+            this.hasDifferenceOnlyInCustomAttributes   = hasDifferenceOnlyInCustomAttributes;
+            this.hasDifferenceOnlyInBusinessAttributes = hasDifferenceOnlyInBusinessAttributes;
+        }
+
+        public AtlasEntity getDiffEntity() {
+            return diffEntity;
+        }
+
+        public boolean hasDifference() {
+            return hasDifference;
+        }
+
+        public boolean hasDifferenceOnlyInCustomAttributes() {
+            return hasDifferenceOnlyInCustomAttributes;
+        }
+
+        public boolean hasDifferenceOnlyInBusinessAttributes() {
+            return hasDifferenceOnlyInBusinessAttributes;
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 2440722..ce58e9a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v2;
 
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.DeleteType;
 import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.GraphTransaction;
@@ -48,7 +49,7 @@ import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
-import org.apache.atlas.DeleteType;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
 import org.apache.atlas.type.AtlasClassificationType;
@@ -85,11 +86,11 @@ import java.util.Objects;
 import java.util.Set;
 
 import static java.lang.Boolean.FALSE;
+import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
-import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
 import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
 import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
 import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.validateLabels;
@@ -100,12 +101,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
     private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore");
 
-    private final AtlasGraph                graph;
-    private final DeleteHandlerDelegate     deleteDelegate;
-    private final AtlasTypeRegistry         typeRegistry;
+    private final AtlasGraph                 graph;
+    private final DeleteHandlerDelegate      deleteDelegate;
+    private final AtlasTypeRegistry          typeRegistry;
     private final IAtlasEntityChangeNotifier entityChangeNotifier;
-    private final EntityGraphMapper         entityGraphMapper;
-    private final EntityGraphRetriever      entityRetriever;
+    private final EntityGraphMapper          entityGraphMapper;
+    private final EntityGraphRetriever       entityRetriever;
+    private final boolean                    storeDifferentialAudits;
 
     @Inject
     public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry,
@@ -116,6 +118,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
         this.entityChangeNotifier = entityChangeNotifier;
         this.entityGraphMapper    = entityGraphMapper;
         this.entityRetriever      = new EntityGraphRetriever(graph, typeRegistry);
+        this.storeDifferentialAudits = STORE_DIFFERENTIAL_AUDITS.getBoolean();
     }
 
     @Override
@@ -1111,7 +1114,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
         }
     }
 
-        private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications, boolean replaceBusinessAttributes) throws AtlasBaseException {
+    private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications, boolean replaceBusinessAttributes) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> createOrUpdate()");
         }
@@ -1143,130 +1146,55 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
             if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
                 MetricRecorder checkForUnchangedEntities = RequestContext.get().startMetricRecord("checkForUnchangedEntities");
 
-                List<AtlasEntity> entitiesToSkipUpdate = null;
+                List<AtlasEntity>     entitiesToSkipUpdate = new ArrayList<>();
+                AtlasEntityComparator entityComparator     = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), !replaceClassifications, !replaceBusinessAttributes);
+                RequestContext        reqContext           = RequestContext.get();
 
                 for (AtlasEntity entity : context.getUpdatedEntities()) {
-                    String          guid       = entity.getGuid();
-                    AtlasVertex     vertex     = context.getVertex(guid);
-                    AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-                    boolean         hasUpdates = false, hasUpdatesInCustAttr = false, hasUpdatesInBusAttr = false;
-
-                    if (!hasUpdates) {
-                        hasUpdates = entity.getStatus() == AtlasEntity.Status.DELETED; // entity status could be updated during import
-                    }
-
-                    if (!hasUpdates && MapUtils.isNotEmpty(entity.getAttributes())) { // check for attribute value change
-                        for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
-                            if (!entity.getAttributes().containsKey(attribute.getName())) {  // if value is not provided, current value will not be updated
-                                continue;
-                            }
-
-                            Object newVal  = entity.getAttribute(attribute.getName());
-                            Object currVal = entityRetriever.getEntityAttribute(vertex, attribute);
-
-                            if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) {
-                                hasUpdates = true;
-
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal);
-                                }
-
-                                break;
-                            }
-                        }
-                    }
-
-                    if (!hasUpdates && MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { // check of relationsship-attribute value change
-                        for (String attributeName : entityType.getRelationshipAttributes().keySet()) {
-                            if (!entity.getRelationshipAttributes().containsKey(attributeName)) {  // if value is not provided, current value will not be updated
-                                continue;
-                            }
-
-                            Object         newVal           = entity.getRelationshipAttribute(attributeName);
-                            String         relationshipType = AtlasEntityUtil.getRelationshipType(newVal);
-                            AtlasAttribute attribute        = entityType.getRelationshipAttribute(attributeName, relationshipType);
-                            Object         currVal          = entityRetriever.getEntityAttribute(vertex, attribute);
-
-                            if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) {
-                                hasUpdates = true;
-
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("found relationship attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal);
-                                }
-
-                                break;
-                            }
-                        }
-                    }
-
-                    if (entity.getCustomAttributes() != null) {
-                        Map<String, String> currCustomAttributes = getCustomAttributes(vertex);
-                        Map<String, String> newCustomAttributes  = entity.getCustomAttributes();
-
-                        if (!Objects.equals(currCustomAttributes, newCustomAttributes)) {
-                            hasUpdatesInCustAttr = true;
-                        }
-                    }
-
-                    if (replaceBusinessAttributes) {
-                        Map<String, Map<String, Object>> currBusinessMetadata = entityRetriever.getBusinessMetadata(vertex);
-                        Map<String, Map<String, Object>> newBusinessMetadata  = entity.getBusinessAttributes();
-
-                        if (!Objects.equals(currBusinessMetadata, newBusinessMetadata)) {
-                            hasUpdatesInBusAttr = true;
-                        }
+                    if (entity.getStatus() == AtlasEntity.Status.DELETED) {// entity status could be updated during import
+                        continue;
                     }
 
-                    // if classifications are to be replaced, then skip updates only when no change in classifications
-                    if (!hasUpdates && replaceClassifications) {
-                        List<AtlasClassification> newVal  = entity.getClassifications();
-                        List<AtlasClassification> currVal = entityRetriever.getAllClassifications(vertex);
-
-                        if (!Objects.equals(currVal, newVal)) {
-                            hasUpdates = true;
+                    AtlasVertex           storedVertex = context.getVertex(entity.getGuid());
+                    AtlasEntityDiffResult diffResult   = entityComparator.getDiffResult(entity, storedVertex, !storeDifferentialAudits);
 
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("found classifications update: entity(guid={}, typeName={}), currValue={}, newValue={}", guid, entity.getTypeName(), currVal, newVal);
-                            }
+                    if (diffResult.hasDifference()) {
+                        if (storeDifferentialAudits) {
+                            diffResult.getDiffEntity().setGuid(entity.getGuid());
+                            reqContext.cacheDifferentialEntity(diffResult.getDiffEntity());
                         }
-                    }
 
-                    if(!hasUpdates && (hasUpdatesInCustAttr || hasUpdatesInBusAttr)) {
-                        hasUpdates = true;
-                        if (hasUpdatesInCustAttr ^ hasUpdatesInBusAttr) {
-                            if(hasUpdatesInCustAttr) RequestContext.get().recordEntityWithCustomAttributeUpdate(entity.getGuid());
-                            if(hasUpdatesInBusAttr) RequestContext.get().recordEntityWithBusinessAttributeUpdate(entity.getGuid());
+                        if (diffResult.hasDifferenceOnlyInCustomAttributes()) {
+                            reqContext.recordEntityWithCustomAttributeUpdate(entity.getGuid());
                         }
-                    }
 
-                    if (!hasUpdates) {
-                        if (entitiesToSkipUpdate == null) {
-                            entitiesToSkipUpdate = new ArrayList<>();
+                        if (diffResult.hasDifferenceOnlyInBusinessAttributes()) {
+                            reqContext.recordEntityWithBusinessAttributeUpdate(entity.getGuid());
                         }
-
+                    } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("skipping unchanged entity: {}", entity);
                         }
 
                         entitiesToSkipUpdate.add(entity);
-                        RequestContext.get().recordEntityToSkip(entity.getGuid());
+                        reqContext.recordEntityToSkip(entity.getGuid());
                     }
                 }
 
-                if (entitiesToSkipUpdate != null) {
+                if (entitiesToSkipUpdate.size() > 0) {
                     // remove entitiesToSkipUpdate from EntityMutationContext
                     context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
                 }
 
                 // Check if authorized to update entities
-                if (!RequestContext.get().isImportInProgress()) {
+                if (!reqContext.isImportInProgress()) {
                     for (AtlasEntity entity : context.getUpdatedEntities()) {
                         AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)),
                                                              "update entity: type=", entity.getTypeName());
                     }
                 }
 
-                RequestContext.get().endMetricRecord(checkForUnchangedEntities);
+                reqContext.endMetricRecord(checkForUnchangedEntities);
             }
 
             EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications, replaceBusinessAttributes);
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparatorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparatorTest.java
new file mode 100644
index 0000000..7e4a7e2
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparatorTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestRelationshipUtilsV2;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.*;
+
+import static org.apache.atlas.TestUtilsV2.NAME;
+import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class AtlasEntityComparatorTest extends AtlasEntityTestBase {
+
+    private AtlasEntityComparator comparator;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        super.setUp();
+
+        AtlasTypesDef[] testTypesDefs = new AtlasTypesDef[] {
+                TestUtilsV2.defineHiveTypes(),
+                TestRelationshipUtilsV2.getInverseReferenceTestTypes()
+        };
+        createTypesDef(testTypesDefs);
+
+        comparator = new AtlasEntityComparator(typeRegistry, null, null, false, false);
+    }
+
+    @AfterClass
+    public void clear() throws Exception {
+
+    }
+
+    @Test
+    public void testSameEntities () throws AtlasBaseException {
+        AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+        dbEntity.setAttribute("namespace", "db namespace");
+        dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+        AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+        AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity, tableEntity.getAttribute("name").toString());
+
+        tableEntity.setClassifications(toAtlasClassifications(new String[]{"test-1"}));
+        newTableEntity.setClassifications(toAtlasClassifications(new String[]{"test-1"}));
+
+        tableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar");}});
+        newTableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar");}});
+
+        AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(tableEntity, newTableEntity, true);
+        Assert.assertFalse(diffResult.hasDifference());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+        diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+        Assert.assertFalse(diffResult.hasDifference());
+    }
+
+    @Test
+    public void testChangedEntities () throws AtlasBaseException {
+        AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+        dbEntity.setAttribute("namespace", "db namespace");
+        dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+        AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+        AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity);
+
+        tableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-1");}});
+        newTableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-2");}});
+
+        AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(newTableEntity, tableEntity, true);
+        Assert.assertTrue(diffResult.hasDifference());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+        diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+        Assert.assertNotNull(diffResult.getDiffEntity());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+    }
+
+    @Test
+    public void testChangedEntitiesCustomAttribute () throws AtlasBaseException {
+        AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+        dbEntity.setAttribute("namespace", "db namespace");
+        dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+        AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+        AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity, tableEntity.getAttribute("name").toString());
+
+        tableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-1");}});
+        newTableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-2");}});
+
+        AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(newTableEntity, tableEntity, true);
+        Assert.assertTrue(diffResult.hasDifference());
+        Assert.assertTrue(diffResult.hasDifferenceOnlyInCustomAttributes());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+        diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+        Assert.assertNotNull(diffResult.getDiffEntity());
+        Assert.assertTrue(diffResult.hasDifferenceOnlyInCustomAttributes());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+    }
+
+    @Test
+    public void testChangedEntitiesClassification () throws AtlasBaseException {
+        AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+        dbEntity.setAttribute("namespace", "db namespace");
+        dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+        AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+        AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity, tableEntity.getAttribute("name").toString());
+
+        tableEntity.setClassifications(toAtlasClassifications(new String[]{"test-1"}));
+        newTableEntity.setClassifications(toAtlasClassifications(new String[]{"test-2"}));
+
+        AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(newTableEntity, tableEntity, true);
+        Assert.assertTrue(diffResult.hasDifference());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+        diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+        Assert.assertNotNull(diffResult.getDiffEntity());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+        Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+    }
+
+    @Test
+    public void testChangedRelationshipAttributes () throws AtlasBaseException {
+        AtlasEntity a1 = new AtlasEntity("A");
+        a1.setAttribute(NAME, "a1_name");
+
+        AtlasEntity a2 = new AtlasEntity("A");
+        a2.setAttribute(NAME, "a2_name");
+
+        AtlasEntity a3 = new AtlasEntity("A");
+        a3.setAttribute(NAME, "a3_name");
+
+        AtlasEntity b1 = new AtlasEntity("B");
+        b1.setAttribute(NAME, "b_name");
+        b1.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+        AtlasEntity b2 = new AtlasEntity("B");
+        b2.setAttribute(NAME, "b_name");
+        b2.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+        AtlasEntity b3 = new AtlasEntity("B");
+        b3.setAttribute(NAME, "b_name");
+        b3.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a3)));
+
+        AtlasEntity b4 = new AtlasEntity("B");
+        b4.setAttribute(NAME, "b_name");
+
+        Assert.assertFalse(comparator.getDiffResult(b1, b2, true).hasDifference());
+        Assert.assertTrue(comparator.getDiffResult(b1, b3, true).hasDifference());
+        Assert.assertTrue(comparator.getDiffResult(b1, b4, true).hasDifference());
+        Assert.assertTrue(comparator.getDiffResult(b3, b4, true).hasDifference());
+
+        Assert.assertFalse(comparator.getDiffResult(b1, b2, false).hasDifference());
+        Assert.assertTrue(comparator.getDiffResult(b1, b3, false).hasDifference());
+        Assert.assertTrue(comparator.getDiffResult(b1, b4, false).hasDifference());
+        Assert.assertTrue(comparator.getDiffResult(b3, b4, false).hasDifference());
+    }
+
+    private List<AtlasClassification> toAtlasClassifications(String[] classificationNames) {
+        List<AtlasClassification> ret = new ArrayList<>();
+
+        if (classificationNames != null) {
+            for (String classificationName : classificationNames) {
+                ret.add(new AtlasClassification(classificationName));
+            }
+        }
+
+        return ret;
+    }
+
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
index c9f4912..9aced8e 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContext;
@@ -61,6 +62,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
 import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_CHARACTERS;
 import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_LENGTH;
 import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_VALUE;
@@ -854,6 +856,45 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
     }
 
     @Test
+    public void testDifferentialEntitiesOnUpdate () throws Exception {
+
+        ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), true);
+        init();
+        // test for entity attributes
+
+        AtlasEntity dbEntity = new AtlasEntity(TestUtilsV2.DATABASE_TYPE);
+        dbEntity.setAttribute("name", TestUtilsV2.randomString(10));
+        dbEntity.setAttribute("description", "us db");
+        dbEntity.setAttribute("namespace", "db namespace");
+        dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+        EntityStream           dbStream        = new AtlasEntityStream(new AtlasEntitiesWithExtInfo(dbEntity));
+        EntityMutationResponse response        = entityStore.createOrUpdate(dbStream, false);
+        AtlasEntityHeader      dbHeader        = response.getFirstEntityCreated();
+        AtlasEntity            createdDbEntity = getEntityFromStore(dbHeader);
+
+        assertEquals(RequestContext.get().getDifferentialEntities().size(), 0);
+
+        // update the db entity
+        dbEntity = new AtlasEntity(TestUtilsV2.DATABASE_TYPE);
+        dbEntity.setGuid(createdDbEntity.getGuid());
+        dbEntity.setAttribute("description", "new description");
+
+        dbStream = new AtlasEntityStream(new AtlasEntitiesWithExtInfo(dbEntity));
+
+        response = entityStore.createOrUpdate(dbStream, true);
+        dbHeader = response.getFirstEntityPartialUpdated();
+        AtlasEntity updatedDbEntity = getEntityFromStore(dbHeader);
+
+        assertEquals(RequestContext.get().getDifferentialEntities().size(), 1);
+        AtlasEntity diffEntity = RequestContext.get().getDifferentialEntity(updatedDbEntity.getGuid());
+        assertNotNull(diffEntity.getAttribute("description"));
+        Assert.assertNull(diffEntity.getAttribute("namespace"));
+        Assert.assertNull(diffEntity.getAttribute("name"));
+        ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), false);
+    }
+
+    @Test
     public void testSetObjectIdAttrToNull() throws Exception {
         final AtlasEntity dbEntity  = TestUtilsV2.createDBEntity();
         final AtlasEntity db2Entity = TestUtilsV2.createDBEntity();
@@ -1004,7 +1045,10 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
     }
 
     @Test (dependsOnMethods = "testCreate")
-    public void addCustomAttributesToEntity() throws AtlasBaseException {
+    public void addCustomAttributesToEntity() throws Exception {
+
+        ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), true);
+        init();
         AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid);
 
         Map<String, String> customAttributes = new HashMap<>();
@@ -1017,10 +1061,13 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
         tblEntity.setCustomAttributes(customAttributes);
 
         entityStore.createOrUpdate(new AtlasEntityStream(tblEntity), false);
+        assertEquals(RequestContext.get().getDifferentialEntities().size(), 1);
 
         tblEntity = getEntityFromStore(tblEntityGuid);
 
         assertEquals(customAttributes, tblEntity.getCustomAttributes());
+
+        ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), false);
     }
 
     @Test (dependsOnMethods = "addCustomAttributesToEntity")
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 216ba08..7de3536 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -53,6 +53,7 @@ public class RequestContext {
     private final Map<String, AtlasEntityHeader>         deletedEntities      = new HashMap<>();
     private final Map<String, AtlasEntity>               entityCache          = new HashMap<>();
     private final Map<String, AtlasEntityWithExtInfo>    entityExtInfoCache   = new HashMap<>();
+    private final Map<String, AtlasEntity>               diffEntityCache      = new HashMap<>();
     private final Map<String, List<AtlasClassification>> addedPropagations    = new HashMap<>();
     private final Map<String, List<AtlasClassification>> removedPropagations  = new HashMap<>();
     private final AtlasPerfMetrics                       metrics              = isMetricsEnabled ? new AtlasPerfMetrics() : null;
@@ -114,6 +115,7 @@ public class RequestContext {
         this.deletedEntities.clear();
         this.entityCache.clear();
         this.entityExtInfoCache.clear();
+        this.diffEntityCache.clear();
         this.addedPropagations.clear();
         this.removedPropagations.clear();
         this.entitiesToSkipUpdate.clear();
@@ -344,6 +346,17 @@ public class RequestContext {
         }
     }
 
+    public void cacheDifferentialEntity(AtlasEntity entity) {
+        if (entity != null && entity.getGuid() != null) {
+            diffEntityCache.put(entity.getGuid(), entity);
+        }
+    }
+
+    public AtlasEntity getDifferentialEntity(String guid) {
+        return diffEntityCache.get(guid);
+    }
+
+    public Collection<AtlasEntity> getDifferentialEntities() { return diffEntityCache.values(); }
 
     public Collection<AtlasEntityHeader> getUpdatedEntities() {
         return updatedEntities.values();