You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/12/20 18:50:23 UTC

[atlas] branch branch-2.0 updated: ATLAS-3568: Performance improvements in writing audit logs

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new bcf3866  ATLAS-3568: Performance improvements in writing audit logs
bcf3866 is described below

commit bcf3866d20dd7f5cb8c8eabd7b26410e09920033
Author: Saqeeb Shaikh <sa...@cloudera.com>
AuthorDate: Fri Dec 20 23:45:19 2019 +0530

    ATLAS-3568: Performance improvements in writing audit logs
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
    (cherry picked from commit fb9f1e96e2dbd1588b2be4f8a63854a2e913dc1e)
---
 .../atlas/listener/EntityChangeListenerV2.java     |  18 ++++
 .../repository/audit/EntityAuditListenerV2.java    |  45 +++++++++
 .../store/graph/v2/AtlasEntityChangeNotifier.java  |  82 ++++++++++++++-
 .../store/graph/v2/AtlasEntityStoreV2.java         |   8 ++
 .../store/graph/v2/EntityGraphMapper.java          | 111 +++++++++++----------
 .../notification/EntityNotificationListenerV2.java |  10 ++
 6 files changed, 222 insertions(+), 52 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
index d36582b..2394a12 100644
--- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -74,6 +74,15 @@ public interface EntityChangeListenerV2 {
     void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
 
     /**
+     * This is upon adding new classifications to entities.
+     *
+     * @param entities              list of entities
+     * @param classifications classifications that are to be added to entities
+     * @throws AtlasBaseException if the listener notification fails
+     */
+    void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException;
+
+    /**
      * This is upon updating classifications to an entity.
      *
      * @param entity          the entity
@@ -92,6 +101,15 @@ public interface EntityChangeListenerV2 {
     void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
 
     /**
+     * This is upon deleting classifications from entities.
+     *
+     * @param entities              list of entities
+     * @param classifications classifications that needs to be deleted from entities
+     * @throws AtlasBaseException if the listener notification fails
+     */
+    void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException;
+
+    /**
      * This is upon adding a new term to an entity.
      *
      * @param term     the term
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 706da16..cab4e1e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -44,6 +44,7 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -172,6 +173,28 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
     }
 
     @Override
+    public void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            MetricRecorder           metric = RequestContext.get().startMetricRecord("entityAudit");
+            List<EntityAuditEventV2> events = Collections.synchronizedList(new ArrayList<>());
+
+            for (AtlasClassification classification : classifications) {
+                for (AtlasEntity entity : entities) {
+                    if (entity.getGuid().equals(classification.getEntityGuid())) {
+                        events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)));
+                    } else {
+                        events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
+                    }
+                }
+            }
+
+            auditRepository.putEventsV2(events);
+
+            RequestContext.get().endMetricRecord(metric);
+        }
+    }
+
+    @Override
     public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications)) {
             MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
@@ -221,6 +244,28 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
     }
 
     @Override
+    public void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications) && CollectionUtils.isNotEmpty(entities)) {
+            MetricRecorder           metric = RequestContext.get().startMetricRecord("entityAudit");
+            List<EntityAuditEventV2> events = Collections.synchronizedList(new ArrayList<>());
+
+            for (AtlasClassification classification : classifications) {
+                for (AtlasEntity entity : entities) {
+                    if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
+                        events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName()));
+                    } else {
+                        events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
+                    }
+                }
+            }
+
+            auditRepository.putEventsV2(events);
+
+            RequestContext.get().endMetricRecord(metric);
+        }
+    }
+
+    @Override
     public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
         if (term != null && CollectionUtils.isNotEmpty(entities)) {
             MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index bd1ba58..d7020a7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -151,7 +151,7 @@ public class AtlasEntityChangeNotifier {
                 Referenceable entityRef = toReferenceable(entity.getGuid());
                 List<Struct>  traits    = toStruct(addedClassifications);
 
-                if (entity == null || CollectionUtils.isEmpty(traits)) {
+                if (entityRef == null || CollectionUtils.isEmpty(traits)) {
                     return;
                 }
 
@@ -166,6 +166,41 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled) {
+            doFullTextMappingHelper(entities);
+
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsAdded(entities, addedClassifications);
+            }
+        } else {
+            updateFullTextMapping(entities, addedClassifications);
+
+            if (instanceConverter != null) {
+                List<Struct> traits = toStruct(addedClassifications);
+
+                if (!CollectionUtils.isEmpty(traits)) {
+                    for(AtlasEntity entity : entities) {
+                        Referenceable entityRef = toReferenceable(entity.getGuid());
+
+                        if (entityRef == null) {
+                            LOG.warn("EntityRef with guid {} not found while adding classifications {} ", entity.getGuid(), addedClassifications);
+                            continue;
+                        }
+
+                        for (EntityChangeListener listener : entityChangeListeners) {
+                            try {
+                                listener.onTraitsAdded(entityRef, traits);
+                            } catch (AtlasException e) {
+                                throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
         doFullTextMapping(entity.getGuid());
 
@@ -220,6 +255,39 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+        doFullTextMappingHelper(entities);
+
+        if (isV2EntityNotificationEnabled) {
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsDeleted(entities, deletedClassifications);
+            }
+        } else {
+            if (instanceConverter != null) {
+                List<Struct> traits = toStruct(deletedClassifications);
+
+                if(!CollectionUtils.isEmpty(deletedClassifications)) {
+                    for(AtlasEntity entity : entities) {
+                        Referenceable entityRef = toReferenceable(entity.getGuid());
+
+                        if (entityRef == null) {
+                            LOG.warn("EntityRef with guid {} not found while deleting classifications {} ", entity.getGuid(), deletedClassifications);
+                            continue;
+                        }
+
+                        for (EntityChangeListener listener : entityChangeListeners) {
+                            try {
+                                listener.onTraitsDeleted(entityRef, traits);
+                            } catch (AtlasException e) {
+                                throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
         // listeners notified on term-entity association only if v2 notifications are enabled
         if (isV2EntityNotificationEnabled) {
@@ -575,6 +643,12 @@ public class AtlasEntityChangeNotifier {
         RequestContext.get().endMetricRecord(metric);
     }
 
+    private void updateFullTextMapping(List<AtlasEntity> entities, List<AtlasClassification> classifications) {
+        for (AtlasEntity entity : entities) {
+            updateFullTextMapping(entity.getGuid(), classifications);
+        }
+    }
+
     private void doFullTextMapping(String guid) {
         if(AtlasRepositoryConfiguration.isFreeTextSearchEnabled() || !AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
             return;
@@ -586,6 +660,12 @@ public class AtlasEntityChangeNotifier {
         doFullTextMapping(Collections.singletonList(entityHeader));
     }
 
+    private void doFullTextMappingHelper(List<AtlasEntity> entities) {
+        for (AtlasEntity entity : entities) {
+            doFullTextMapping(entity.getGuid());
+        }
+    }
+
     private void pruneResponse(EntityMutationResponse resp) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> pruneResponse()");
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 3fbcecb..25284e9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -631,6 +631,12 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
             LOG.debug("Updating classifications={} for entity={}", classifications, guid);
         }
 
+        AtlasPerfTracer perf = null;
+
+        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+            AtlasPerfTracer.getPerfTracer(PERF_LOG, "AtlasEntityStoreV2.updateClassification()");
+        }
+
         if (StringUtils.isEmpty(guid)) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified");
         }
@@ -663,6 +669,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
         }
 
         entityGraphMapper.updateClassifications(context, guid, classifications);
+
+        AtlasPerfTracer.log(perf);
     }
 
     @Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 7a2ccb9..765ba36 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -62,6 +62,7 @@ import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.utils.AtlasEntityUtil;
 import org.apache.atlas.utils.AtlasJson;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
+import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -111,7 +112,8 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
 
 @Component
 public class EntityGraphMapper {
-    private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class);
+    private static final Logger LOG      = LoggerFactory.getLogger(EntityGraphMapper.class);
+    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("entityGraphMapper");
 
     private static final String  SOFT_REF_FORMAT                   = "%s:%s";
     private static final int     INDEXED_STR_SAFE_LEN              = AtlasConfiguration.GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH.getInt();
@@ -1690,11 +1692,11 @@ public class EntityGraphMapper {
         if (CollectionUtils.isNotEmpty(classifications)) {
             MetricRecorder metric = RequestContext.get().startMetricRecord("addClassifications");
 
-            final AtlasVertex                           entityVertex          = context.getVertex(guid);
-            final AtlasEntityType                       entityType            = context.getType(guid);
-            List<AtlasVertex>                           entitiesToPropagateTo = null;
-            Map<AtlasVertex, List<AtlasClassification>> propagations          = null;
-            List<AtlasClassification>                   addClassifications    = new ArrayList<>(classifications.size());
+            final AtlasVertex                              entityVertex          = context.getVertex(guid);
+            final AtlasEntityType                          entityType            = context.getType(guid);
+            List<AtlasVertex>                              entitiesToPropagateTo = null;
+            Map<AtlasClassification, HashSet<AtlasVertex>> addedClassifications  = new HashMap<>();
+            List<AtlasClassification>                      addClassifications    = new ArrayList<>(classifications.size());
 
             for (AtlasClassification c : classifications) {
                 AtlasClassification classification      = new AtlasClassification(c);
@@ -1761,23 +1763,17 @@ public class EntityGraphMapper {
                     }
 
                     if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
-                        if (propagations == null) {
-                            propagations = new HashMap<>(entitiesToPropagateTo.size());
-
-                            for (AtlasVertex entityToPropagateTo : entitiesToPropagateTo) {
-                                propagations.put(entityToPropagateTo, new ArrayList<>());
-                            }
-                        }
-
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityType.getTypeName(), getTypeNames(entitiesToPropagateTo));
                         }
 
                         List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
 
-                        if (entitiesPropagatedTo != null) {
-                            for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
-                                propagations.get(entityPropagatedTo).add(classification);
+                        if (CollectionUtils.isNotEmpty(entitiesPropagatedTo)) {
+                            if(addedClassifications.get(classification) == null) {
+                                addedClassifications.put(classification, new HashSet<>(entitiesPropagatedTo));
+                            } else {
+                                addedClassifications.get(classification).addAll(entitiesPropagatedTo);
                             }
                         }
                     } else {
@@ -1801,15 +1797,11 @@ public class EntityGraphMapper {
                 notificationVertices.addAll(entitiesToPropagateTo);
             }
 
-            for (AtlasVertex vertex : notificationVertices) {
-                String                    entityGuid           = GraphHelper.getGuid(vertex);
-                AtlasEntity               entity               = instanceConverter.getAndCacheEntity(entityGuid);
-                List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? addClassifications : propagations.get(vertex);
+            for (AtlasClassification classification : addedClassifications.keySet()) {
+                Set<AtlasVertex>  vertices           = addedClassifications.get(classification);
+                List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices);
 
-                vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
-                if (CollectionUtils.isNotEmpty(addedClassifications)) {
-                    entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications);
-                }
+                entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, classifications);
             }
 
             RequestContext.get().endMetricRecord(metric);
@@ -1999,6 +1991,12 @@ public class EntityGraphMapper {
             throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
         }
 
+        AtlasPerfTracer perf = null;
+
+        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+            perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityGraphMapper.updateClassifications");
+        }
+
         String                    entityTypeName         = AtlasGraphUtilsV2.getTypeName(entityVertex);
         AtlasEntityType           entityType             = typeRegistry.getEntityTypeByName(entityTypeName);
         List<AtlasClassification> updatedClassifications = new ArrayList<>();
@@ -2006,7 +2004,7 @@ public class EntityGraphMapper {
         Set<AtlasVertex>          notificationVertices   = new HashSet<AtlasVertex>() {{ add(entityVertex); }};
 
         Map<AtlasVertex, List<AtlasClassification>> addedPropagations   = null;
-        Map<AtlasVertex, List<AtlasClassification>> removedPropagations = null;
+        Map<AtlasClassification, List<AtlasVertex>> removedPropagations = new HashMap<>();
 
         for (AtlasClassification classification : classifications) {
             String classificationName       = classification.getTypeName();
@@ -2116,21 +2114,17 @@ public class EntityGraphMapper {
                     List<AtlasVertex> impactedVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
 
                     if (CollectionUtils.isNotEmpty(impactedVertices)) {
-                        if (removedPropagations == null) {
-                            removedPropagations = new HashMap<>();
-
-                            for (AtlasVertex impactedVertex : impactedVertices) {
-                                List<AtlasClassification> removedClassifications = removedPropagations.get(impactedVertex);
-
-                                if (removedClassifications == null) {
-                                    removedClassifications = new ArrayList<>();
-
-                                    removedPropagations.put(impactedVertex, removedClassifications);
-                                }
-
-                                removedClassifications.add(classification);
-                            }
-                        }
+                        /*
+                            removedPropagations is a HashMap of entity against list of classifications i.e. for each entity 1 entry in the map.
+                            Maintaining classification wise entity list lets us send the audit request in bulk,
+                            since 1 classification is applied to many entities (including the child entities).
+                            Eg. If a classification is being propagated to 1000 entities, its edge count would be 2000, as per removedPropagations map
+                            we would have 2000 entries and value would always be 1 classification wrapped in a list.
+                            By this rearrangement we maintain an entity list against each classification, as of now its entry size would be 1 (as per request from UI)
+                            instead of 2000. Moreover this allows us to send audit request classification wise instead of separate requests for each entities.
+                            This reduces audit calls from 2000 to 1.
+                         */
+                        removedPropagations.put(classification, impactedVertices);
                     }
                 }
             }
@@ -2152,19 +2146,17 @@ public class EntityGraphMapper {
             }
         }
 
-        if (removedPropagations != null) {
-            for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedPropagations.entrySet()) {
-                AtlasVertex               vertex                 = entry.getKey();
-                List<AtlasClassification> removedClassifications = entry.getValue();
-                String                    entityGuid             = GraphHelper.getGuid(vertex);
-                AtlasEntity               entity                 = instanceConverter.getAndCacheEntity(entityGuid);
+        if (MapUtils.isNotEmpty(removedPropagations)) {
+            for (AtlasClassification classification : removedPropagations.keySet()) {
+                List<AtlasVertex> propagatedVertices = removedPropagations.get(classification);
+                List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices);
 
-                if (isActive(entity)) {
-                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
-                    entityChangeNotifier.onClassificationDeletedFromEntity(entity, removedClassifications);
-                }
+                //Sending audit request for all entities at once
+                entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
             }
         }
+
+        AtlasPerfTracer.log(perf);
     }
 
     private AtlasEdge mapClassification(EntityOperation operation,  final EntityMutationContext context, AtlasClassification classification,
@@ -2383,4 +2375,21 @@ public class EntityGraphMapper {
             }
         }
     }
+
+    private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException {
+        List<AtlasEntity> propagatedEntities = new ArrayList<>();
+
+        if(CollectionUtils.isNotEmpty(propagatedVertices)) {
+            for(AtlasVertex vertex : propagatedVertices) {
+                AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex));
+
+                if (isActive(entity)) {
+                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+                    propagatedEntities.add(entity);
+                }
+            }
+        }
+
+        return propagatedEntities;
+    }
 }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index 8893380..6d64fec 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -107,6 +107,11 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
     }
 
     @Override
+    public void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
+        notifyEntityEvents(entities, CLASSIFICATION_ADD);
+    }
+
+    @Override
     public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
         Map<String, List<AtlasClassification>> addedPropagations   = RequestContext.get().getAddedPropagations();
         Map<String, List<AtlasClassification>> removedPropagations = RequestContext.get().getRemovedPropagations();
@@ -124,6 +129,11 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
     }
 
     @Override
+    public void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
+        notifyEntityEvents(entities, CLASSIFICATION_DELETE);
+    }
+
+    @Override
     public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) {
         // do nothing -> notification not sent out for term assignment to entities
     }