You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/04/13 00:09:55 UTC

[atlas] branch branch-2.0 updated: ATLAS-3689: added entity-audit entries on business attributes add/update/delete to an entity

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 48b0566  ATLAS-3689: added entity-audit entries on business attributes add/update/delete to an entity
48b0566 is described below

commit 48b056689bc19ac01f253898c37a7f444c0c6852
Author: Mandar Ambawane <ma...@freestoneinfotech.com>
AuthorDate: Sun Apr 12 13:22:02 2020 +0530

    ATLAS-3689: added entity-audit entries on business attributes add/update/delete to an entity
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 80135a8dbedfbaff444267681aa13d958e867c7e)
---
 dashboardv2/public/js/utils/Enums.js               |  5 +-
 .../js/views/audit/CreateAuditTableLayoutView.js   |  8 +-
 .../atlas/listener/EntityChangeListenerV2.java     |  9 +++
 .../atlas/model/audit/EntityAuditEventV2.java      |  5 +-
 .../repository/audit/EntityAuditListenerV2.java    | 82 ++++++++++++-------
 .../store/graph/v2/AtlasEntityChangeNotifier.java  | 26 +++++-
 .../store/graph/v2/EntityGraphMapper.java          | 84 ++++++++++++++-----
 .../store/graph/v2/IAtlasEntityChangeNotifier.java | 57 +++++++++++++
 .../v2/bulkimport/EntityChangeNotifierNop.java     | 94 ++++++++++++++++++++++
 .../notification/EntityNotificationListenerV2.java |  5 ++
 10 files changed, 317 insertions(+), 58 deletions(-)

diff --git a/dashboardv2/public/js/utils/Enums.js b/dashboardv2/public/js/utils/Enums.js
index e2d8cd2..780dad2 100644
--- a/dashboardv2/public/js/utils/Enums.js
+++ b/dashboardv2/public/js/utils/Enums.js
@@ -39,8 +39,7 @@ define(['require'], function(require) {
         LABEL_ADD: "Label(s) Added",
         LABEL_DELETE: "Label(s) Deleted",
         ENTITY_PURGE: "Entity Purged",
-        BUSINESS_ATTRIBUTE_ADD: "Business Attribute(s) Added",
-        BUSINESS_ATTRIBUTE_DELETE: "Business Attribute(s) Deleted"
+        BUSINESS_ATTRIBUTE_UPDATE: "Business Attribute(s) Updated"
     }
 
     Enums.entityStateReadOnly = {
@@ -214,4 +213,4 @@ define(['require'], function(require) {
         1: "true"
     };
     return Enums;
-});
\ No newline at end of file
+});
diff --git a/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js b/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
index a820616..f7673ce 100644
--- a/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
+++ b/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
@@ -124,7 +124,11 @@ define(['require',
                                         relationshipAttributes = parseDetailsObject.relationshipAttributes;
                                     if (attributesDetails) {
                                         that.ui.attributeDetails.removeClass('hide');
-                                        that.action.indexOf("Classification") === -1 ? that.ui.panelAttrHeading.html("Technical properties ") : that.ui.panelAttrHeading.html("Properties ");
+                                        if (that.action.includes("Classification") || that.action.includes("Business Attribute") != -1) {
+                                            that.ui.panelAttrHeading.html("Properties ");
+                                        } else {
+                                            that.ui.panelAttrHeading.html("Technical properties ");
+                                        }
                                         var attrTable = that.createTableWithValues(attributesDetails);
                                         that.ui.attributeCard.html(
                                             attrTable);
@@ -174,4 +178,4 @@ define(['require',
             }
         });
     return CreateAuditTableLayoutView;
-});
\ No newline at end of file
+});
diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
index 2394a12..374d0dd 100644
--- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -173,4 +174,12 @@ public interface EntityChangeListenerV2 {
      * @throws AtlasBaseException if the listener notification fails
      */
     void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException;
+
+    /**
+     *
+     * @param entity the entity
+     * @param updatedBusinessAttributes business metadata attribute
+     * @throws AtlasBaseException if the listener notification fails
+     */
+    void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
 }
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index 9301e21..bcfdd94 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -50,7 +50,8 @@ public class EntityAuditEventV2 implements Serializable {
         ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
         CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
         PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
-        TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE;
+        TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE,
+        BUSINESS_ATTRIBUTE_UPDATE;
 
         public static EntityAuditActionV2 fromString(String strValue) {
             switch (strValue) {
@@ -91,6 +92,8 @@ public class EntityAuditEventV2 implements Serializable {
                     return LABEL_ADD;
                 case "LABEL_DELETE":
                     return LABEL_DELETE;
+                case "BUSINESS_ATTRIBUTE_UPDATE":
+                    return BUSINESS_ATTRIBUTE_UPDATE;
             }
 
             throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index cab4e1e..4c1e1a9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -28,11 +28,13 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasJson;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -50,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.BUSINESS_ATTRIBUTE_UPDATE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
@@ -341,6 +344,57 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
         }
     }
 
+    @Override
+    public void onRelationshipsAdded(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("New relationship(s) added to repository(" + relationships.size() + ")");
+        }
+    }
+
+    @Override
+    public void onRelationshipsUpdated(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
+        }
+    }
+
+    @Override
+    public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
+        }
+    }
+
+    @Override
+    public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
+        }
+    }
+
+    @Override
+    public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
+        if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
+            List<EntityAuditEventV2> auditEvents = new ArrayList<>();
+
+            for (Map.Entry<String, Map<String, Object>> entry : updatedBusinessAttributes.entrySet()) {
+                String              bmName     = entry.getKey();
+                Map<String, Object> attributes = entry.getValue();
+                String              details    = AtlasJson.toJson(new AtlasStruct(bmName, attributes));
+                EntityAuditEventV2  auditEvent = createEvent(entity, BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
+
+                auditEvents.add(auditEvent);
+            }
+
+            auditRepository.putEventsV2(auditEvents);
+
+            RequestContext.get().endMetricRecord(metric);
+        }
+    }
+
+
     private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) {
         return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(),
                                       RequestContext.get().getUser(), action, details, entity);
@@ -566,32 +620,4 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
 
         return ret;
     }
-
-    @Override
-    public void onRelationshipsAdded(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("New relationship(s) added to repository(" + relationships.size() + ")");
-        }
-    }
-
-    @Override
-    public void onRelationshipsUpdated(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
-        }
-    }
-
-    @Override
-    public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
-        }
-    }
-
-    @Override
-    public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
-        }
-    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index d7020a7..0dc3193 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -66,7 +66,7 @@ import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
 
 
 @Component
-public class AtlasEntityChangeNotifier {
+public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
 
     private final Set<EntityChangeListener>   entityChangeListeners;
@@ -91,6 +91,7 @@ public class AtlasEntityChangeNotifier {
         this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
     }
 
+    @Override
     public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
         if (CollectionUtils.isEmpty(entityChangeListeners)) {
             return;
@@ -119,6 +120,7 @@ public class AtlasEntityChangeNotifier {
         notifyPropagatedEntities();
     }
 
+    @Override
     public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
         if (CollectionUtils.isEmpty(entityChangeListeners)) {
             return;
@@ -137,6 +139,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
         if (isV2EntityNotificationEnabled) {
             doFullTextMapping(entity.getGuid());
@@ -166,6 +169,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
         if (isV2EntityNotificationEnabled) {
             doFullTextMappingHelper(entities);
@@ -201,6 +205,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
         doFullTextMapping(entity.getGuid());
 
@@ -228,6 +233,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
         doFullTextMapping(entity.getGuid());
 
@@ -255,6 +261,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
         doFullTextMappingHelper(entities);
 
@@ -288,6 +295,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
         // listeners notified on term-entity association only if v2 notifications are enabled
         if (isV2EntityNotificationEnabled) {
@@ -307,6 +315,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
         // listeners notified on term-entity disassociation only if v2 notifications are enabled
         if (isV2EntityNotificationEnabled) {
@@ -326,6 +335,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
         doFullTextMapping(entityGuid);
 
@@ -339,6 +349,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void notifyPropagatedEntities() throws AtlasBaseException {
         RequestContext                         context             = RequestContext.get();
         Map<String, List<AtlasClassification>> addedPropagations   = context.getAddedPropagations();
@@ -348,6 +359,18 @@ public class AtlasEntityChangeNotifier {
         notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE);
     }
 
+    @Override
+    public void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
+        if (isV2EntityNotificationEnabled) {
+            AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid);
+
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onBusinessAttributesUpdated(entity, updatedBusinessAttributes);
+            }
+        }
+    }
+
+
     private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException {
         if (MapUtils.isEmpty(entityPropagationMap) || action == null) {
             return;
@@ -752,5 +775,4 @@ public class AtlasEntityChangeNotifier {
             }
         }
     }
-
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 9269ae6..fc09015 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -399,24 +399,6 @@ public class EntityGraphMapper {
         }
     }
 
-    private void updateLabels(AtlasVertex vertex, Set<String> labels) {
-        if (CollectionUtils.isNotEmpty(labels)) {
-            AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels));
-        } else {
-            vertex.removeProperty(LABELS_PROPERTY_KEY);
-        }
-    }
-
-    private String getLabelString(Collection<String> labels) {
-        String ret = null;
-
-        if (!labels.isEmpty()) {
-            ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER, labels) + LABEL_NAME_DELIMITER;
-        }
-
-        return ret;
-    }
-
     /*
      * reset/overwrite business attributes of the entity with given values
      */
@@ -426,6 +408,7 @@ public class EntityGraphMapper {
         }
 
         Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+        Map<String, Map<String, Object>>                 updatedBusinessAttributes    = new HashMap<>();
 
         for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
             String                              bmName             = entry.getKey();
@@ -444,6 +427,8 @@ public class EntityGraphMapper {
                         }
 
                         mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, CREATE, new EntityMutationContext());
+
+                        addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
                     }
                 } else {
                     if (bmAttrNewValue != null) {
@@ -453,6 +438,8 @@ public class EntityGraphMapper {
                             }
 
                             mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, UPDATE, new EntityMutationContext());
+
+                            addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
                         }
                     } else {
                         if (LOG.isDebugEnabled()) {
@@ -460,11 +447,17 @@ public class EntityGraphMapper {
                         }
 
                         entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
+
+                        addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
                     }
                 }
             }
         }
 
+        if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+            entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== setBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
         }
@@ -479,6 +472,7 @@ public class EntityGraphMapper {
         }
 
         Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+        Map<String, Map<String, Object>>                 updatedBusinessAttributes    = new HashMap<>();
 
         if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) {
             for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
@@ -503,16 +497,24 @@ public class EntityGraphMapper {
                     if (existingValue == null) {
                         if (bmAttrValue != null) {
                             mapAttribute(bmAttribute, bmAttrValue, entityVertex, CREATE, new EntityMutationContext());
+
+                            addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrValue);
                         }
                     } else {
                         if (!Objects.equals(existingValue, bmAttrValue)) {
                             mapAttribute(bmAttribute, bmAttrValue, entityVertex, UPDATE, new EntityMutationContext());
+
+                            addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrValue);
                         }
                     }
                 }
             }
         }
 
+        if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+            entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== addOrUpdateBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
         }
@@ -521,12 +523,13 @@ public class EntityGraphMapper {
     /*
      * remove the given business attributes from the entity
      */
-    public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType entityType, Map<String, Map<String, Object>> businessAttributes) {
+    public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType entityType, Map<String, Map<String, Object>> businessAttributes) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
         }
 
         Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+        Map<String, Map<String, Object>>                 updatedBusinessAttributes    = new HashMap<>();
 
         if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) {
             for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
@@ -539,16 +542,22 @@ public class EntityGraphMapper {
 
                 Map<String, Object> entityBmAttributes = businessAttributes.get(bmName);
 
-                for (AtlasBusinessAttribute bmttribute : bmAttributes.values()) {
+                for (AtlasBusinessAttribute bmAttribute : bmAttributes.values()) {
                     // if (entityBmAttributes is empty) remove all attributes in this business-metadata
                     // else remove the attribute only if its given in entityBmAttributes
-                    if (MapUtils.isEmpty(entityBmAttributes) || entityBmAttributes.containsKey(bmttribute.getName())) {
-                        entityVertex.removeProperty(bmttribute.getVertexPropertyName());
+                    if (MapUtils.isEmpty(entityBmAttributes) || entityBmAttributes.containsKey(bmAttribute.getName())) {
+                        entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
+
+                        addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, null);
                     }
                 }
             }
         }
 
+        if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+            entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
         }
@@ -2559,4 +2568,35 @@ public class EntityGraphMapper {
 
         return propagatedEntities;
     }
+
+    private void updateLabels(AtlasVertex vertex, Set<String> labels) {
+        if (CollectionUtils.isNotEmpty(labels)) {
+            AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels));
+        } else {
+            vertex.removeProperty(LABELS_PROPERTY_KEY);
+        }
+    }
+
+    private String getLabelString(Collection<String> labels) {
+        String ret = null;
+
+        if (!labels.isEmpty()) {
+            ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER, labels) + LABEL_NAME_DELIMITER;
+        }
+
+        return ret;
+    }
+
+    private void addToUpdatedBusinessAttributes(Map<String, Map<String, Object>> updatedBusinessAttributes, AtlasBusinessAttribute bmAttribute, Object attrValue) {
+        String              bmName     = bmAttribute.getDefinedInType().getTypeName();
+        Map<String, Object> attributes = updatedBusinessAttributes.get(bmName);
+
+        if(attributes == null){
+            attributes = new HashMap<>();
+
+            updatedBusinessAttributes.put(bmName, attributes);
+        }
+
+        attributes.put(bmAttribute.getName(), attrValue);
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..f617ef9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface IAtlasEntityChangeNotifier {
+    void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException;
+
+    void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException;
+
+    void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+    void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+    void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+    void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+    void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+    void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+    void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException;
+
+    void notifyPropagatedEntities() throws AtlasBaseException;
+
+    void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException;
+
+    void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
new file mode 100644
index 0000000..05516f2
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
+    @Override
+    public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void notifyPropagatedEntities() throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
+
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index 6d64fec..a677b31 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -316,4 +316,9 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
     public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
         // do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
     }
+
+    @Override
+    public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
+        // do nothing -> notification not sent out for business metadata attribute updation from entities
+    }
 }
\ No newline at end of file