You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/02/20 23:08:15 UTC

[1/3] atlas git commit: ATLAS-2456: Implement tag propagation using relationships

Repository: atlas
Updated Branches:
  refs/heads/master 9c58d30c7 -> a3374c747


http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index b05a9a3..d01fb9f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -30,15 +30,18 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
 import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasElement;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasMapType;
 import org.apache.atlas.type.AtlasRelationshipType;
@@ -58,13 +61,32 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_PROPAGATE_KEY;
+import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
+import static org.apache.atlas.repository.graph.GraphHelper.edgeExists;
+import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex;
+import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
+import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.removePropagatedTraitNameFromVertex;
 import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
@@ -191,6 +213,17 @@ public final class EntityGraphRetriever {
         return ret;
     }
 
+    public AtlasClassification toAtlasClassification(AtlasVertex classificationVertex) throws AtlasBaseException {
+        AtlasClassification ret = new AtlasClassification(getTypeName(classificationVertex));
+
+        ret.setEntityGuid(AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_ENTITY_GUID, String.class));
+        ret.setPropagate(AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_PROPAGATE_KEY, Boolean.class));
+
+        mapAttributes(classificationVertex, ret, null);
+
+        return ret;
+    }
+
     public AtlasVertex getReferencedEntityVertex(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection, AtlasVertex parentVertex) throws AtlasBaseException {
         AtlasVertex entityVertex = null;
 
@@ -245,7 +278,7 @@ public final class EntityGraphRetriever {
     }
 
     private AtlasEntity mapVertexToAtlasEntity(AtlasVertex entityVertex, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
-        String      guid   = GraphHelper.getGuid(entityVertex);
+        String      guid   = getGuid(entityVertex);
         AtlasEntity entity = entityExtInfo != null ? entityExtInfo.getEntity(guid) : null;
 
         if (entity == null) {
@@ -265,7 +298,7 @@ public final class EntityGraphRetriever {
 
             mapRelationshipAttributes(entityVertex, entity);
 
-            mapClassifications(entityVertex, entity, entityExtInfo);
+            mapClassifications(entityVertex, entity);
         }
 
         return entity;
@@ -284,7 +317,7 @@ public final class EntityGraphRetriever {
         ret.setTypeName(typeName);
         ret.setGuid(guid);
         ret.setStatus(GraphHelper.getStatus(entityVertex));
-        ret.setClassificationNames(GraphHelper.getTraitNames(entityVertex));
+        ret.setClassificationNames(getAllTraitNames(entityVertex));
 
         AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
 
@@ -347,8 +380,8 @@ public final class EntityGraphRetriever {
             LOG.debug("Mapping system attributes for type {}", entity.getTypeName());
         }
 
-        entity.setGuid(GraphHelper.getGuid(entityVertex));
-        entity.setTypeName(GraphHelper.getTypeName(entityVertex));
+        entity.setGuid(getGuid(entityVertex));
+        entity.setTypeName(getTypeName(entityVertex));
         entity.setStatus(GraphHelper.getStatus(entityVertex));
         entity.setVersion(GraphHelper.getVersion(entityVertex));
 
@@ -377,28 +410,52 @@ public final class EntityGraphRetriever {
         }
     }
 
-    public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
+    public boolean isPropagationEnabled(AtlasVertex classificationVertex) {
+        boolean ret = false;
 
+        if (classificationVertex != null) {
+            Boolean enabled = AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_PROPAGATE_KEY, Boolean.class);
+
+            ret = enabled == null ? true : enabled;
+        }
+
+        return ret;
+    }
+
+    public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
         AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
+
         if (instanceVertex == null) {
             throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
         }
 
-        return getClassifications(instanceVertex, null);
+        return getClassifications(instanceVertex);
     }
 
     public List<AtlasClassification> getClassifications(AtlasVertex instanceVertex) throws AtlasBaseException {
-        return getClassifications(instanceVertex, null);
+        final List<AtlasClassification> classifications           = getClassifications(instanceVertex, null);
+        final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(instanceVertex, null);
+
+        classifications.addAll(propagatedClassifications);
+
+        return classifications;
     }
 
     public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException {
-
         AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
+
         if (instanceVertex == null) {
             throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
         }
 
-        List<AtlasClassification> classifications = getClassifications(instanceVertex, classificationName);
+        List<AtlasClassification> classifications = null;
+
+        try {
+            classifications = getClassifications(instanceVertex, classificationName);
+        } catch (AtlasBaseException excp) {
+            // ignore and look for propagated classifications
+            classifications = getPropagatedClassifications(instanceVertex, classificationName);
+        }
 
         if(CollectionUtils.isEmpty(classifications)) {
             throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
@@ -409,54 +466,141 @@ public final class EntityGraphRetriever {
 
 
     private List<AtlasClassification> getClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException {
-        List<AtlasClassification> classifications = new ArrayList<>();
-        List<String> classificationNames = GraphHelper.getTraitNames(instanceVertex);
+        List<AtlasClassification> ret                 = new ArrayList<>();
+        List<String>              classificationNames = getTraitNames(instanceVertex);
 
         if (CollectionUtils.isNotEmpty(classificationNames)) {
-            for (String classificationName : classificationNames) {
-                AtlasClassification classification;
-                if (StringUtils.isNotEmpty(classificationNameFilter)) {
-                    if (classificationName.equals(classificationNameFilter)) {
-                        classification = getClassification(instanceVertex, classificationName);
-                        classifications.add(classification);
-                        return classifications;
-                    }
-                } else {
-                    classification = getClassification(instanceVertex, classificationName);
-                    classifications.add(classification);
+            if (StringUtils.isNotEmpty(classificationNameFilter)) {
+                if (classificationNames.contains(classificationNameFilter)) {
+                    ret.add(getClassification(instanceVertex, classificationNameFilter));
+                }
+            } else {
+                for (String classificationName : classificationNames) {
+                    ret.add(getClassification(instanceVertex, classificationName));
                 }
             }
+        }
+
+        if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) {
+            throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter);
+        }
+
+        return ret;
+    }
 
+    private List<AtlasClassification> getPropagatedClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException {
+        List<AtlasClassification> ret                 = new ArrayList<>();
+        List<String>              classificationNames = getPropagatedTraitNames(instanceVertex);
 
+        if (CollectionUtils.isNotEmpty(classificationNames)) {
             if (StringUtils.isNotEmpty(classificationNameFilter)) {
-                //Should not reach here if classification present
-                throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter);
+                if (classificationNames.contains(classificationNameFilter)) {
+                    ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationNameFilter));
+                }
+            } else {
+                for (String classificationName : new HashSet<>(classificationNames)) {
+                    ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationName));
+                }
             }
         }
-        return classifications;
+
+        if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) {
+            throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter);
+        }
+
+        return ret;
     }
 
-    private AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException {
+    private List<AtlasClassification> getAllPropagatedClassifications(AtlasVertex vertex, String classificationName) throws AtlasBaseException {
+        List<AtlasClassification> ret       = new ArrayList<>();
+        String                    edgeLabel = getPropagatedEdgeLabel(classificationName);
+        Iterable<AtlasEdge>       edges     = vertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel);
 
-        AtlasClassification ret = null;
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("mapping classification {} to atlas entity", classificationName);
+        if (edges != null) {
+            for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) {
+                AtlasEdge edge = iterator.next();
+
+                if (edge != null) {
+                    AtlasClassification classification = toAtlasClassification(edge.getInVertex());
+
+                    ret.add(classification);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    protected List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex instanceVertex) {
+        List<AtlasVertex> ret                              = new ArrayList<>();
+        List<AtlasVertex> classificationVertices           = getPropagationEnabledClassificationVertices(instanceVertex, false);
+        List<AtlasVertex> propagatedClassificationVertices = getPropagationEnabledClassificationVertices(instanceVertex, true);
+
+        if (CollectionUtils.isNotEmpty(classificationVertices)) {
+            ret.addAll(classificationVertices);
+        }
+
+        if (CollectionUtils.isNotEmpty(propagatedClassificationVertices)) {
+            ret.addAll(propagatedClassificationVertices);
+        }
+
+        return ret;
+    }
+
+    private List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex vertex, boolean propagated) {
+        List<AtlasVertex> ret                 = new ArrayList<>();
+        List<String>      classificationNames = (propagated) ? getPropagatedTraitNames(vertex) : getTraitNames(vertex);
+
+        if (CollectionUtils.isNotEmpty(classificationNames)) {
+            for (String classificationName : classificationNames) {
+                String              traitLabel   = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
+                Iterable<AtlasEdge> edges        = vertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
+
+                if (edges != null) {
+                    for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) {
+                        AtlasEdge edge = iterator.next();
+
+                        if (edge != null) {
+                            AtlasVertex classificationVertex = edge.getInVertex();
+
+                            if (isPropagationEnabled(classificationVertex)) {
+                                ret.add(classificationVertex);
+                            }
+                        }
+                    }
+                }
+            }
         }
 
-        Iterable<AtlasEdge> edges = instanceVertex.getEdges(AtlasEdgeDirection.OUT, classificationName);
-        AtlasEdge           edge  = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null;
+        return ret;
+    }
 
-        if (edge != null) {
-            ret = new AtlasClassification(classificationName);
-            mapAttributes(edge.getInVertex(), ret, null);
+    public AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException {
+        AtlasClassification ret = getClassification(instanceVertex, classificationName, false);
+
+        // if no classification with the given name was directly associated, look for a propagated classification
+        if (ret == null) {
+            ret = getClassification(instanceVertex, classificationName, true);
         }
 
         return ret;
     }
 
-    private void mapClassifications(AtlasVertex entityVertex, AtlasEntity entity, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
-        final List<AtlasClassification> classifications = getClassifications(entityVertex, null);
+    private AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName, boolean propagated) throws AtlasBaseException {
+        String              traitLabel = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
+        Iterable<AtlasEdge> edges      = instanceVertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
+        AtlasEdge           edge       = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null;
+        AtlasClassification ret        = edge != null ? toAtlasClassification(edge.getInVertex()) : null;
+
+        return ret;
+    }
+
+    private void mapClassifications(AtlasVertex entityVertex, AtlasEntity entity) throws AtlasBaseException {
+        final List<AtlasClassification> classifications           = getClassifications(entityVertex, null);
+        final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(entityVertex, null);
+
         entity.setClassifications(classifications);
+        entity.addClassifications(propagatedClassifications);
     }
 
     private Object mapVertexToAttribute(AtlasVertex entityVertex, AtlasAttribute attribute, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
@@ -664,7 +808,7 @@ public final class EntityGraphRetriever {
                         ret = AtlasTypeUtil.getAtlasObjectId(entity);
                     }
                 } else {
-                    ret = new AtlasObjectId(GraphHelper.getGuid(referenceVertex), GraphHelper.getTypeName(referenceVertex));
+                    ret = new AtlasObjectId(getGuid(referenceVertex), getTypeName(referenceVertex));
                 }
             }
         }
@@ -681,7 +825,7 @@ public final class EntityGraphRetriever {
 
         if (GraphHelper.elementExists(edge)) {
             final AtlasVertex referenceVertex = edge.getInVertex();
-            ret = new AtlasStruct(GraphHelper.getTypeName(referenceVertex));
+            ret = new AtlasStruct(getTypeName(referenceVertex));
 
             mapAttributes(referenceVertex, ret, entityExtInfo);
         }
@@ -756,11 +900,11 @@ public final class EntityGraphRetriever {
         Iterator<AtlasEdge>        edges = null;
 
         if (attribute.getRelationshipEdgeDirection() == IN) {
-            edges = graphHelper.getIncomingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel());
+            edges = getIncomingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel());
         } else if (attribute.getRelationshipEdgeDirection() == OUT) {
-            edges = graphHelper.getOutGoingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel());
+            edges = getOutGoingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel());
         } else if (attribute.getRelationshipEdgeDirection() == BOTH) {
-            edges = graphHelper.getAdjacentEdgesByLabel(entityVertex, AtlasEdgeDirection.BOTH, attribute.getRelationshipEdgeLabel());
+            edges = getAdjacentEdgesByLabel(entityVertex, AtlasEdgeDirection.BOTH, attribute.getRelationshipEdgeLabel());
         }
 
         if (edges != null) {
@@ -787,8 +931,8 @@ public final class EntityGraphRetriever {
             }
 
             if (referenceVertex != null) {
-                String            entityTypeName = GraphHelper.getTypeName(referenceVertex);
-                String            entityGuid     = GraphHelper.getGuid(referenceVertex);
+                String            entityTypeName = getTypeName(referenceVertex);
+                String            entityGuid     = getGuid(referenceVertex);
                 AtlasRelationship relationship   = mapEdgeToAtlasRelationship(edge);
 
                 ret = new AtlasRelatedObjectId(entityGuid, entityTypeName, relationship.getGuid(),
@@ -835,8 +979,8 @@ public final class EntityGraphRetriever {
             LOG.debug("Mapping system attributes for relationship");
         }
 
-        relationship.setGuid(GraphHelper.getGuid(edge));
-        relationship.setTypeName(GraphHelper.getTypeName(edge));
+        relationship.setGuid(getRelationshipGuid(edge));
+        relationship.setTypeName(getTypeName(edge));
 
         relationship.setCreatedBy(GraphHelper.getCreatedByAsString(edge));
         relationship.setUpdatedBy(GraphHelper.getModifiedByAsString(edge));
@@ -856,11 +1000,11 @@ public final class EntityGraphRetriever {
         AtlasVertex end1Vertex = edge.getOutVertex();
         AtlasVertex end2Vertex = edge.getInVertex();
 
-        relationship.setEnd1(new AtlasObjectId(GraphHelper.getGuid(end1Vertex), GraphHelper.getTypeName(end1Vertex)));
-        relationship.setEnd2(new AtlasObjectId(GraphHelper.getGuid(end2Vertex), GraphHelper.getTypeName(end2Vertex)));
+        relationship.setEnd1(new AtlasObjectId(getGuid(end1Vertex), getTypeName(end1Vertex)));
+        relationship.setEnd2(new AtlasObjectId(getGuid(end2Vertex), getTypeName(end2Vertex)));
 
         relationship.setLabel(edge.getLabel());
-        relationship.setPropagateTags(GraphHelper.getPropagateTags(edge));
+        relationship.setPropagateTags(getPropagateTags(edge));
 
         return relationship;
     }
@@ -881,4 +1025,152 @@ public final class EntityGraphRetriever {
             relationship.setAttribute(attribute.getName(), attrValue);
         }
     }
-}
+
+    public void addTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException {
+        if (edge == null) {
+            return;
+        }
+
+        AtlasVertex outVertex = edge.getOutVertex();
+        AtlasVertex inVertex  = edge.getInVertex();
+
+        if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
+            addTagPropagation(outVertex, inVertex, edge);
+        }
+
+        if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
+            addTagPropagation(inVertex, outVertex, edge);
+        }
+    }
+
+    public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException {
+        if (edge == null) {
+            return;
+        }
+
+        AtlasVertex outVertex = edge.getOutVertex();
+        AtlasVertex inVertex  = edge.getInVertex();
+
+        if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
+            removeTagPropagation(outVertex, inVertex, edge);
+        }
+
+        if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
+            removeTagPropagation(inVertex, outVertex, edge);
+        }
+    }
+
+    private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
+        final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
+        final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null;
+
+        if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size());
+            }
+
+            for (AtlasVertex classificationVertex : classificationVertices) {
+                String                  classificationName     = getTypeName(classificationVertex);
+                String                  propagatedEdgeLabel    = getPropagatedEdgeLabel(classificationName);
+                AtlasVertex             associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
+                AtlasClassificationType classificationType     = typeRegistry.getClassificationTypeByName(classificationName);
+
+                for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
+                    if (edgeExists(impactedEntityVertex, classificationVertex, classificationName)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]",
+                                    getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), classificationName);
+                        }
+
+                        continue;
+                    } else if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Propagated classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]",
+                                    getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                        }
+
+                        continue;
+                    }
+
+                    String          entityTypeName = getTypeName(impactedEntityVertex);
+                    AtlasEntityType entityType     = typeRegistry.getEntityTypeByName(entityTypeName);
+
+                    if (!classificationType.canApplyToEntityType(entityType)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Not creating propagated classification edge from [{}] --> [{}][{}], classification is not applicable for entity type",
+                                        getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex));
+                        }
+
+                        continue;
+                    }
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" --> Creating propagated classification edge from [{}] --> [{}][{}] using edge label: [{}]",
+                                  getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                    }
+
+                    graphHelper.addEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel);
+
+                    GraphHelper.addListProperty(impactedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
+                }
+            }
+        }
+    }
+
+    private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
+        final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
+        final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null;
+
+        if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size());
+            }
+
+            for (AtlasVertex classificationVertex : classificationVertices) {
+                String            classificationName     = getTypeName(classificationVertex);
+                String            propagatedEdgeLabel    = getPropagatedEdgeLabel(classificationName);
+                AtlasVertex       associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
+                List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge));
+
+                for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
+                    if (referrals.contains(impactedEntityVertex)) {
+                        if (LOG.isDebugEnabled()) {
+                            if (StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) {
+                                LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]",
+                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName, getTypeName(associatedEntityVertex));
+                            } else {
+                                LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path",
+                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName);
+                            }
+                        }
+
+                        continue;
+                    }
+
+                    // remove propagated classification edge and classificationName from propagatedTraitNames vertex property
+                    if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) {
+                        try {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]",
+                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                            }
+
+                            AtlasEdge propagatedEdge = graphHelper.getOrCreateEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel);
+
+                            graphHelper.removeEdge(propagatedEdge);
+
+                            removePropagatedTraitNameFromVertex(impactedEntityVertex, classificationName);
+                        } catch (RepositoryException e) {
+                            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist",
+                                      getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
index 1fda241..58e3492 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
@@ -63,6 +63,18 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
                 return "g.V().range(0,1).toList()";
             case GREMLIN_SEARCH_RETURNS_EDGE_ID:
                 return "g.E().range(0,1).toList()";
+
+            case TAG_PROPAGATION_IMPACTED_INSTANCES:
+                return "g.V().has('__guid', guid).aggregate('src')" +
+                            ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).inV(), " +
+                                           "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).outV())" +
+                            ".dedup().where(without('src')).simplePath()).emit().toList();";
+
+            case TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL:
+                return "g.V().has('__guid', guid).aggregate('src')" +
+                            ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(relationshipGuid)).inV(), " +
+                                           "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(relationshipGuid)).outV())" +
+                            ".dedup().where(without('src')).simplePath()).emit().toList();";
         }
         return super.getQuery(gremlinQuery);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
index a79abaa..3e3ee11 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
@@ -81,6 +81,9 @@ public abstract class AtlasGremlinQueryProvider {
         COMPARE_ENDS_WITH,
         COMPARE_CONTAINS,
         COMPARE_IS_NULL,
-        COMPARE_NOT_NULL
+        COMPARE_NOT_NULL,
+
+        TAG_PROPAGATION_IMPACTED_INSTANCES,
+        TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
index 0e1e5b6..85f0d06 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.atlas.repository.graphdb.GraphDatabase;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
 import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +53,7 @@ public class AtlasRepositoryConfiguration {
     private static Integer typeUpdateLockMaxWaitTimeInSeconds = null;
 
     private static final String ENABLE_FULLTEXT_SEARCH_PROPERTY = "atlas.search.fulltext.enable";
+    private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = "atlas.notification.entity.version";
 
     /**
      * Configures whether the full text vertex property is populated.  Turning this off
@@ -62,6 +64,19 @@ public class AtlasRepositoryConfiguration {
         return ApplicationProperties.get().getBoolean(ENABLE_FULLTEXT_SEARCH_PROPERTY, true);
     }
 
+    public static boolean isV2EntityNotificationEnabled() {
+        boolean ret;
+        try {
+            String notificationVersion = ApplicationProperties.get().getString(ENTITY_NOTIFICATION_VERSION_PROPERTY, "v2");
+
+            return StringUtils.equalsIgnoreCase(notificationVersion, "v2");
+        } catch (AtlasException e) {
+            ret = true;
+        }
+
+        return ret;
+    }
+
     private static final String AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY = "atlas.EntityAuditRepository.impl";
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/test/java/org/apache/atlas/TestModules.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index 13bdcb0..c901e89 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -30,8 +30,10 @@ import org.apache.atlas.discovery.EntityDiscoveryService;
 import org.apache.atlas.discovery.EntityLineageService;
 import org.apache.atlas.graph.GraphSandboxUtil;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.listener.EntityChangeListenerV2;
 import org.apache.atlas.listener.TypeDefChangeListener;
 import org.apache.atlas.repository.audit.EntityAuditListener;
+import org.apache.atlas.repository.audit.EntityAuditListenerV2;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -145,6 +147,10 @@ public class TestModules {
                     Multibinder.newSetBinder(binder(), EntityChangeListener.class);
             entityChangeListenerBinder.addBinding().to(EntityAuditListener.class);
 
+            Multibinder<EntityChangeListenerV2> entityChangeListenerV2Binder =
+                    Multibinder.newSetBinder(binder(), EntityChangeListenerV2.class);
+            entityChangeListenerV2Binder.addBinding().to(EntityAuditListenerV2.class);
+
             final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get());
             requestInjection(graphTransactionInterceptor);
             bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), graphTransactionInterceptor);

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
index 47a61ee..4d70b7f 100644
--- a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -46,9 +46,9 @@ public class AuditRepositoryTestBase {
         EntityAuditEvent event = new EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
                 EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1", new Referenceable(rand()));
 
-        eventRepository.putEvents(event);
+        eventRepository.putEventsV1(event);
 
-        List<EntityAuditEvent> events = eventRepository.listEvents(event.getEntityId(), null, (short) 10);
+        List<EntityAuditEvent> events = eventRepository.listEventsV1(event.getEntityId(), null, (short) 10);
 
         assertEquals(events.size(), 1);
         assertEventEquals(events.get(0), event);
@@ -67,28 +67,28 @@ public class AuditRepositoryTestBase {
             //Add events for both ids
             EntityAuditEvent event = new EntityAuditEvent(id2, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "details" + i, entity);
 
-            eventRepository.putEvents(event);
+            eventRepository.putEventsV1(event);
             expectedEvents.add(event);
-            eventRepository.putEvents(new EntityAuditEvent(id1, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i, entity));
-            eventRepository.putEvents(new EntityAuditEvent(id3, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i, entity));
+            eventRepository.putEventsV1(new EntityAuditEvent(id1, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i, entity));
+            eventRepository.putEventsV1(new EntityAuditEvent(id3, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i, entity));
         }
 
         //Use ts for which there is no event - ts + 2
-        List<EntityAuditEvent> events = eventRepository.listEvents(id2, null, (short) 3);
+        List<EntityAuditEvent> events = eventRepository.listEventsV1(id2, null, (short) 3);
         assertEquals(events.size(), 3);
         assertEventEquals(events.get(0), expectedEvents.get(0));
         assertEventEquals(events.get(1), expectedEvents.get(1));
         assertEventEquals(events.get(2), expectedEvents.get(2));
 
         //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
-        events = eventRepository.listEvents(id2, events.get(2).getEventKey(), (short) 3);
+        events = eventRepository.listEventsV1(id2, events.get(2).getEventKey(), (short) 3);
         assertEquals(events.size(), 1);
         assertEventEquals(events.get(0), expectedEvents.get(2));
     }
 
     @Test
     public void testInvalidEntityId() throws Exception {
-        List<EntityAuditEvent> events = eventRepository.listEvents(rand(), null, (short) 3);
+        List<EntityAuditEvent> events = eventRepository.listEventsV1(rand(), null, (short) 3);
 
         assertEquals(events.size(), 0);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
new file mode 100644
index 0000000..01a95cf
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.listener.EntityChangeListenerV2;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.v1.model.notification.EntityNotificationV2;
+import org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.notification.NotificationEntityChangeListener.ATLAS_ENTITY_NOTIFICATION_PROPERTY;
+import static org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES;
+import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_ADD;
+import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_DELETE;
+import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_UPDATE;
+import static org.apache.atlas.repository.graph.GraphHelper.isInternalType;
+import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_CREATE;
+import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_DELETE;
+import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_UPDATE;
+
+@Component
+public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
+    private final AtlasTypeRegistry         typeRegistry;
+    private final NotificationInterface     notificationInterface;
+    private final Map<String, List<String>> notificationAttributesCache = new HashMap<>();
+
+    private static Configuration APPLICATION_PROPERTIES = null;
+
+    @Inject
+    public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry, NotificationInterface notificationInterface) {
+        this.typeRegistry          = typeRegistry;
+        this.notificationInterface = notificationInterface;
+    }
+
+    @Override
+    public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        notifyEntityEvents(entities, ENTITY_CREATE);
+    }
+
+    @Override
+    public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        notifyEntityEvents(entities, ENTITY_UPDATE);
+    }
+
+    @Override
+    public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        notifyEntityEvents(entities, ENTITY_DELETE);
+    }
+
+    @Override
+    public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
+        notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_ADD);
+    }
+
+    @Override
+    public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
+        notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_UPDATE);
+    }
+
+    @Override
+    public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException {
+        notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_DELETE);
+    }
+
+    private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException {
+        List<EntityNotificationV2> messages = new ArrayList<>();
+
+        for (AtlasEntity entity : entities) {
+            if (isInternalType(entity.getTypeName())) {
+                continue;
+            }
+
+            filterNotificationAttributes(entity);
+
+            messages.add(new EntityNotificationV2(entity, operationType, getAllClassifications(entity)));
+        }
+
+        if (!messages.isEmpty()) {
+            try {
+                notificationInterface.send(ENTITIES, messages);
+            } catch (NotificationException e) {
+                throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name());
+            }
+        }
+    }
+
+    private List<AtlasClassification> getAllClassifications(AtlasEntity entity) {
+        List<AtlasClassification> ret = getAllClassifications(entity.getClassifications(), typeRegistry);
+
+        return ret;
+    }
+
+    private static List<AtlasClassification> getAllClassifications(List<AtlasClassification> classifications, AtlasTypeRegistry typeRegistry) {
+        List<AtlasClassification> ret = new LinkedList<>();
+
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            for (AtlasClassification classification : classifications) {
+                AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+                Set<String>             superTypeNames     = classificationType != null ? classificationType.getAllSuperTypes() : null;
+
+                ret.add(classification);
+
+                if (CollectionUtils.isNotEmpty(superTypeNames)) {
+                    for (String superTypeName : superTypeNames) {
+                        AtlasClassification superTypeClassification = new AtlasClassification(superTypeName);
+
+                        superTypeClassification.setEntityGuid(classification.getEntityGuid());
+                        superTypeClassification.setPropagate(classification.isPropagate());
+
+                        if (MapUtils.isNotEmpty(classification.getAttributes())) {
+                            AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
+
+                            if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) {
+                                Map<String, Object> superTypeClassificationAttributes = new HashMap<>();
+
+                                for (Map.Entry<String, Object> attrEntry : classification.getAttributes().entrySet()) {
+                                    String attrName = attrEntry.getKey();
+
+                                    if (superType.getAllAttributes().containsKey(attrName)) {
+                                        superTypeClassificationAttributes.put(attrName, attrEntry.getValue());
+                                    }
+                                }
+
+                                superTypeClassification.setAttributes(superTypeClassificationAttributes);
+                            }
+                        }
+
+                        ret.add(superTypeClassification);
+                    }
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private void filterNotificationAttributes(AtlasEntity entity) {
+        Map<String, Object> attributesMap     = entity.getAttributes();
+        List<String>        notificationAttrs = getNotificationAttributes(entity.getTypeName());
+
+        if (MapUtils.isNotEmpty(attributesMap) && CollectionUtils.isNotEmpty(notificationAttrs)) {
+            Collection<String> attributesToRemove = CollectionUtils.subtract(attributesMap.keySet(), notificationAttrs);
+
+            for (String attributeToRemove : attributesToRemove) {
+                attributesMap.remove(attributeToRemove);
+            }
+        }
+    }
+
+    private List<String> getNotificationAttributes(String entityType) {
+        List<String> ret = null;
+
+        initApplicationProperties();
+
+        if (notificationAttributesCache.containsKey(entityType)) {
+            ret = notificationAttributesCache.get(entityType);
+        } else if (APPLICATION_PROPERTIES != null) {
+            String[] notificationAttributes = APPLICATION_PROPERTIES.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." +
+                                                                                    entityType + "." + "attributes.include");
+
+            if (notificationAttributes != null) {
+                ret = Arrays.asList(notificationAttributes);
+            }
+
+            notificationAttributesCache.put(entityType, ret);
+        }
+
+        return ret;
+    }
+
+    private void initApplicationProperties() {
+        if (APPLICATION_PROPERTIES == null) {
+            try {
+                APPLICATION_PROPERTIES = ApplicationProperties.get();
+            } catch (AtlasException ex) {
+                // ignore
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index 396a292..a3e5949 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -42,7 +42,7 @@ import java.util.*;
  */
 @Component
 public class NotificationEntityChangeListener implements EntityChangeListener {
-    private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity";
+    protected static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity";
 
     private final NotificationInterface     notificationInterface;
     private final AtlasTypeRegistry         typeRegistry;

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index 715a54d..4f4f091 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -18,7 +18,6 @@
 
 package org.apache.atlas.web.resources;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -28,7 +27,7 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.CreateUpdateEntitiesResult;
 import org.apache.atlas.EntityAuditEvent;
-import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
@@ -98,7 +97,7 @@ public class EntityResource {
     private final AtlasTypeRegistry      typeRegistry;
     private final EntityREST             entityREST;
     private final EntityAuditRepository  entityAuditRepository;
-    private final AtlasDiscoveryService  atlasDiscoveryService;
+    private final AtlasInstanceConverter instanceConverter;
 
     @Context
     UriInfo uriInfo;
@@ -109,13 +108,13 @@ public class EntityResource {
                           final AtlasTypeRegistry typeRegistry,
                           final EntityREST entityREST,
                           final EntityAuditRepository entityAuditRepository,
-                          final AtlasDiscoveryService atlasDiscoveryService) {
+                          final AtlasInstanceConverter instanceConverter) {
         this.restAdapters  = restAdapters;
         this.entitiesStore = entitiesStore;
         this.typeRegistry  = typeRegistry;
         this.entityREST    = entityREST;
         this.entityAuditRepository = entityAuditRepository;
-        this.atlasDiscoveryService = atlasDiscoveryService;
+        this.instanceConverter = instanceConverter;
     }
 
     /**
@@ -1133,20 +1132,27 @@ public class EntityResource {
 
         AtlasPerfTracer perf = null;
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Audit events request for entity {}, start key {}, number of results required {}", guid, startKey, count);
-        }
-
         try {
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                 perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityResource.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")");
             }
 
-            List<EntityAuditEvent> events = entityAuditRepository.listEvents(guid, startKey, count);
+            List                   events   = entityAuditRepository.listEvents(guid, startKey, count);
+            List<EntityAuditEvent> v1Events = new ArrayList<>();
+
+            for (Object event : events) {
+                if (event instanceof EntityAuditEvent) {
+                    v1Events.add((EntityAuditEvent) event);
+                } else if (event instanceof EntityAuditEventV2) {
+                    v1Events.add(instanceConverter.toV1AuditEvent((EntityAuditEventV2) event));
+                } else {
+                    LOG.warn("unknown entity-audit event type {}. Ignored", event != null ? event.getClass().getCanonicalName() : "null");
+                }
+            }
 
             Map<String, Object> response = new HashMap<>();
             response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
-            response.put(AtlasClient.EVENTS, events);
+            response.put(AtlasClient.EVENTS, v1Events);
             return Response.ok(AtlasJson.toV1Json(response)).build();
         } catch (IllegalArgumentException e) {
             LOG.error("Unable to get audit events for entity guid={} startKey={}", guid, startKey, e);

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 21402e1..fdafa2c 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -18,6 +18,8 @@
 package org.apache.atlas.web.rest;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
@@ -26,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.ClassificationAssociateRequest;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
@@ -38,6 +42,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.inject.Inject;
@@ -45,6 +50,7 @@ import javax.inject.Singleton;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -67,17 +73,24 @@ import java.util.Map;
 @Singleton
 @Service
 public class EntityREST {
+    private static final Logger LOG      = LoggerFactory.getLogger(EntityREST.class);
     private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.EntityREST");
 
     public static final String PREFIX_ATTR = "attr:";
 
-    private final AtlasTypeRegistry         typeRegistry;
-    private final AtlasEntityStore          entitiesStore;
+    private final AtlasTypeRegistry      typeRegistry;
+    private final AtlasEntityStore       entitiesStore;
+    private final EntityAuditRepository  auditRepository;
+    private final AtlasInstanceConverter instanceConverter;
+
 
     @Inject
-    public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
-        this.typeRegistry    = typeRegistry;
-        this.entitiesStore   = entitiesStore;
+    public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore,
+                      EntityAuditRepository auditRepository, AtlasInstanceConverter instanceConverter) {
+        this.typeRegistry      = typeRegistry;
+        this.entitiesStore     = entitiesStore;
+        this.auditRepository   = auditRepository;
+        this.instanceConverter = instanceConverter;
     }
 
     /**
@@ -409,14 +422,14 @@ public class EntityREST {
     @PUT
     @Path("/guid/{guid}/classifications")
     @Produces(Servlets.JSON_MEDIA_TYPE)
-    public void updateClassification(@PathParam("guid") final String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+    public void updateClassifications(@PathParam("guid") final String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
         Servlets.validateQueryParamLength("guid", guid);
 
         AtlasPerfTracer perf = null;
 
         try {
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
-                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.updateClassification(" + guid + ")");
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.updateClassifications(" + guid + ")");
             }
 
             if (StringUtils.isEmpty(guid)) {
@@ -581,6 +594,38 @@ public class EntityREST {
         }
     }
 
+    @GET
+    @Path("{guid}/audit")
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public List<EntityAuditEventV2> getAuditEvents(@PathParam("guid") String guid, @QueryParam("startKey") String startKey,
+                                                   @QueryParam("count") @DefaultValue("100") short count) throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")");
+            }
+
+            List                     events = auditRepository.listEvents(guid, startKey, count);
+            List<EntityAuditEventV2> ret    = new ArrayList<>();
+
+            for (Object event : events) {
+                if (event instanceof EntityAuditEventV2) {
+                    ret.add((EntityAuditEventV2) event);
+                } else if (event instanceof EntityAuditEvent) {
+                    ret.add(instanceConverter.toV2AuditEvent((EntityAuditEvent) event));
+                } else {
+                    LOG.warn("unknown entity-audit event type {}. Ignored", event != null ? event.getClass().getCanonicalName() : "null");
+                }
+            }
+
+            return ret;
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
     private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
         AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
index ea6fe31..f0bc962 100644
--- a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
+++ b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
@@ -164,7 +164,7 @@ public class TestEntityREST {
             put("tag", "tagName_updated");
         }});
 
-        entityREST.updateClassification(dbEntity.getGuid(), new ArrayList<>(Arrays.asList(phiClassification, testClassification)));
+        entityREST.updateClassifications(dbEntity.getGuid(), new ArrayList<>(Arrays.asList(phiClassification, testClassification)));
 
         AtlasClassification updatedClassification = entityREST.getClassification(dbEntity.getGuid(), TestUtilsV2.PHI);
         Assert.assertNotNull(updatedClassification);

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
index f769431..e4887d2 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
@@ -808,7 +808,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
         } catch (AtlasServiceException e) {
             assertNotNull(e);
             assertNotNull(e.getStatus());
-            assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND);
+            assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
index 0f43d6f..dabb2ef 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
@@ -560,7 +560,7 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
             fail("Deletion should've failed for non-existent trait association");
         } catch (AtlasServiceException ex) {
             Assert.assertNotNull(ex.getStatus());
-            assertEquals(ex.getStatus(), ClientResponse.Status.NOT_FOUND);
+            assertEquals(ex.getStatus(), ClientResponse.Status.BAD_REQUEST);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/atlas-application.properties b/webapp/src/test/resources/atlas-application.properties
index d900916..62fa603 100644
--- a/webapp/src/test/resources/atlas-application.properties
+++ b/webapp/src/test/resources/atlas-application.properties
@@ -64,6 +64,7 @@ atlas.lineage.schema.query.hive_table_v1=hive_table_v1 where __guid='%s'\, colum
 
 #########  Notification Configs  #########
 atlas.notification.embedded=true
+atlas.notification.entity.version=v1
 
 atlas.kafka.zookeeper.connect=localhost:19026
 atlas.kafka.bootstrap.servers=localhost:19027


[3/3] atlas git commit: ATLAS-2456: Implement tag propagation using relationships

Posted by ma...@apache.org.
ATLAS-2456: Implement tag propagation using relationships

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a3374c74
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a3374c74
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a3374c74

Branch: refs/heads/master
Commit: a3374c747fb900ed44358b8b2c643e439820d2e6
Parents: 9c58d30
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Mon Feb 19 22:58:36 2018 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Feb 20 15:01:32 2018 -0800

----------------------------------------------------------------------
 addons/models/0000-Area0/0010-base_model.json   |   4 +-
 addons/models/1000-Hadoop/1030-hive_model.json  |   8 +-
 addons/models/1000-Hadoop/1060-hbase_model.json |   6 +-
 .../org/apache/atlas/repository/Constants.java  |   4 +
 .../atlas/repository/graphdb/AtlasElement.java  |   3 +-
 .../atlas/repository/graphdb/AtlasVertex.java   |   8 +
 .../graphdb/janus/AtlasJanusVertex.java         |   8 +
 .../repository/graphdb/titan0/Titan0Vertex.java |  14 +
 .../java/org/apache/atlas/AtlasErrorCode.java   |  12 +-
 .../atlas/listener/EntityChangeListenerV2.java  |  81 ++++
 .../atlas/model/audit/EntityAuditEventV2.java   | 175 ++++++++
 .../model/instance/AtlasClassification.java     |  45 +++
 .../atlas/model/instance/AtlasEntity.java       |  11 +
 .../model/notification/EntityNotification.java  |   2 +-
 .../notification/EntityNotificationV2.java      | 129 ++++++
 .../repository/audit/EntityAuditListener.java   |  14 +-
 .../repository/audit/EntityAuditListenerV2.java | 263 ++++++++++++
 .../repository/audit/EntityAuditRepository.java |  47 ++-
 .../audit/HBaseBasedAuditRepository.java        | 172 +++++++-
 .../audit/InMemoryEntityAuditRepository.java    |  63 ++-
 .../audit/NoopEntityAuditRepository.java        |  35 +-
 .../converters/AtlasInstanceConverter.java      | 102 ++++-
 .../graph/GraphBackedSearchIndexer.java         |   2 +
 .../atlas/repository/graph/GraphHelper.java     | 288 ++++++++++++-
 .../graph/v1/AtlasEntityChangeNotifier.java     | 170 +++++---
 .../store/graph/v1/AtlasEntityStoreV1.java      |  70 +---
 .../graph/v1/AtlasRelationshipStoreV1.java      |  87 +++-
 .../store/graph/v1/EntityGraphMapper.java       | 399 +++++++++++++++----
 .../store/graph/v1/EntityGraphRetriever.java    | 392 +++++++++++++++---
 .../atlas/util/AtlasGremlin3QueryProvider.java  |  12 +
 .../atlas/util/AtlasGremlinQueryProvider.java   |   5 +-
 .../util/AtlasRepositoryConfiguration.java      |  15 +
 .../test/java/org/apache/atlas/TestModules.java |   6 +
 .../audit/AuditRepositoryTestBase.java          |  16 +-
 .../EntityNotificationListenerV2.java           | 216 ++++++++++
 .../NotificationEntityChangeListener.java       |   2 +-
 .../atlas/web/resources/EntityResource.java     |  28 +-
 .../org/apache/atlas/web/rest/EntityREST.java   |  59 ++-
 .../atlas/web/adapters/TestEntityREST.java      |   2 +-
 .../web/integration/EntityJerseyResourceIT.java |   2 +-
 .../integration/EntityV2JerseyResourceIT.java   |   2 +-
 .../test/resources/atlas-application.properties |   1 +
 42 files changed, 2637 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/addons/models/0000-Area0/0010-base_model.json
----------------------------------------------------------------------
diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json
index 0296e8f..aebe955 100644
--- a/addons/models/0000-Area0/0010-base_model.json
+++ b/addons/models/0000-Area0/0010-base_model.json
@@ -213,7 +213,7 @@
                 "cardinality": "SET",
                 "isLegacyAttribute": true
             },
-            "propagateTags": "NONE"
+            "propagateTags": "ONE_TO_TWO"
         },
         {
           "name": "process_dataset_outputs",
@@ -232,7 +232,7 @@
                 "isContainer": false,
                 "cardinality": "SET"
           },
-          "propagateTags": "NONE"
+          "propagateTags": "ONE_TO_TWO"
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/addons/models/1000-Hadoop/1030-hive_model.json
----------------------------------------------------------------------
diff --git a/addons/models/1000-Hadoop/1030-hive_model.json b/addons/models/1000-Hadoop/1030-hive_model.json
index 32d9179..68a5c84 100644
--- a/addons/models/1000-Hadoop/1030-hive_model.json
+++ b/addons/models/1000-Hadoop/1030-hive_model.json
@@ -539,7 +539,7 @@
                 "cardinality": "SINGLE",
                 "isLegacyAttribute": true
             },
-            "propagateTags": "ONE_TO_TWO"
+            "propagateTags": "NONE"
         },
         {
             "name": "hive_table_columns",
@@ -559,7 +559,7 @@
                 "cardinality": "SINGLE",
                 "isLegacyAttribute": true
             },
-            "propagateTags": "ONE_TO_TWO"
+            "propagateTags": "NONE"
         },
         {
             "name": "hive_table_partitionkeys",
@@ -579,7 +579,7 @@
                 "cardinality": "SINGLE",
                 "isLegacyAttribute": true
             },
-            "propagateTags": "ONE_TO_TWO"
+            "propagateTags": "NONE"
         },
         {
             "name": "hive_table_storagedesc",
@@ -599,7 +599,7 @@
                 "cardinality": "SINGLE",
                 "isLegacyAttribute": true
             },
-            "propagateTags": "ONE_TO_TWO"
+            "propagateTags": "NONE"
         },
         {
             "name": "hive_process_column_lineage",

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/addons/models/1000-Hadoop/1060-hbase_model.json
----------------------------------------------------------------------
diff --git a/addons/models/1000-Hadoop/1060-hbase_model.json b/addons/models/1000-Hadoop/1060-hbase_model.json
index 9280f59..acf4ff5 100644
--- a/addons/models/1000-Hadoop/1060-hbase_model.json
+++ b/addons/models/1000-Hadoop/1060-hbase_model.json
@@ -157,7 +157,7 @@
                 "cardinality": "SINGLE",
                 "isLegacyAttribute": true
             },
-            "propagateTags": "ONE_TO_TWO"
+            "propagateTags": "NONE"
         },
         {
             "name": "hbase_column_family_columns",
@@ -177,7 +177,7 @@
                 "cardinality": "SINGLE",
                 "isLegacyAttribute": true
             },
-            "propagateTags": "ONE_TO_TWO"
+            "propagateTags": "NONE"
         }
     ]
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/common/src/main/java/org/apache/atlas/repository/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 265be78..ae52880 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -69,6 +69,7 @@ public final class Constants {
      * Trait names property key and index name.
      */
     public static final String TRAIT_NAMES_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "traitNames";
+    public static final String PROPAGATED_TRAIT_NAMES_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagatedTraitNames";
 
     public static final String VERSION_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "version";
     public static final String STATE_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "state";
@@ -115,6 +116,9 @@ public final class Constants {
     public static final String ATTRIBUTE_NAME_VERSION  = "version";
     public static final String TEMP_STRUCT_NAME_PREFIX = "__tempQueryResultStruct";
 
+    public static final String CLASSIFICATION_ENTITY_GUID = INTERNAL_PROPERTY_KEY_PREFIX + "entityGuid";
+    public static final String CLASSIFICATION_PROPAGATE_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
+
     private Constants() {
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java
----------------------------------------------------------------------
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java
index 42837f4..4af39ed 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.atlas.AtlasException;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -85,7 +84,7 @@ public interface AtlasElement {
      * is needed for this because special logic is required to handle this situation
      * in some implementations.
      */
-    void setListProperty(String propertyName, List<String> values) throws AtlasException;
+    void setListProperty(String propertyName, List<String> values);
 
 
     /**

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java
----------------------------------------------------------------------
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java
index a68d8eb..6de4dcf 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java
@@ -55,6 +55,14 @@ public interface AtlasVertex<V, E> extends AtlasElement {
      */
     <T> void addProperty(String propertyName, T value);
 
+    /**
+     * Adds a value to a multiplicity-many property.
+     * If the property is already present, the value is added to it; if not, the propery is set with the given value
+     *
+     * @param propertyName
+     * @param value
+     */
+    <T> void addListProperty(String propertyName, T value);
 
     /**
      * Creates a vertex query.

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java
----------------------------------------------------------------------
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java
index aef20f0..71b2857 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java
@@ -52,6 +52,14 @@ public class AtlasJanusVertex extends AtlasJanusElement<Vertex> implements Atlas
         }
     }
 
+    @Override
+    public <T> void addListProperty(String propertyName, T value) {
+        try {
+            getWrappedElement().property(VertexProperty.Cardinality.list, propertyName, value);
+        } catch(SchemaViolationException e) {
+            throw new AtlasSchemaViolationException(e);
+        }
+    }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java
index ca48e3d..e439ab9 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java
@@ -100,6 +100,20 @@ public class Titan0Vertex extends Titan0Element<Vertex> implements AtlasVertex<T
     }
 
     @Override
+    public <T> void addListProperty(String propertyName, T value) {
+        try {
+            getAsTitanVertex().addProperty(propertyName, value);
+        } catch (SchemaViolationException e) {
+            if (getPropertyValues(propertyName, value.getClass()).contains(value)) {
+                // follow java set semantics, don't throw an exception if
+                // value is already there.
+                return;
+            }
+            throw new AtlasSchemaViolationException(e);
+        }
+    }
+
+    @Override
     public <T> Collection<T> getPropertyValues(String key, Class<T> clazz) {
 
         TitanVertex tv = getAsTitanVertex();

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index ff09e6c..f1d4536 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -122,6 +122,10 @@ public enum AtlasErrorCode {
     INVALID_DSL_HAS_PROPERTY(400, "ATLAS-400-00-068", "DSL Semantic Error - Property needs to be a primitive type: {0}"),
     RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED(404, "ATLAS-400-00-069", "change of relationship end is not permitted. relationship-type={}, relationship-guid={}, end-guid={}, updated-end-guid={}"),
     RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED(404, "ATLAS-400-00-06A", "change of relationship type is not permitted. relationship-guid={}, current-type={}, new-type={}"),
+    CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06B", "Update to classification {0} is not allowed from propagated entity"),
+    CLASSIFICATION_DELETE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06C", "Delete of classification {0} is not allowed from propagated entity"),
+    CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY(400, "ATLAS-400-00-06D", "Classification {0} is not associated with entity"),
+
     // All Not found enums go here
     TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"),
     TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"),
@@ -137,6 +141,7 @@ public enum AtlasErrorCode {
     RELATIONSHIP_CRUD_INVALID_PARAMS(404, "ATLAS-404-00-00D", "Invalid relationship creation/updation parameters passed : {0}"),
     RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-00E", "RelationshipDef {0} endDef typename {0} cannot be found"),
     RELATIONSHIP_ALREADY_DELETED(404, "ATLAS-404-00-00F", "Attempting to delete a relationship which is already deleted : {0}"),
+    INVALID_ENTITY_GUID_FOR_CLASSIFICATION_UPDATE(404, "ATLAS-404-00-010", "Updating entityGuid of classification is not allowed."),
 
     // All data conflict errors go here
     TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already exists"),
@@ -157,7 +162,6 @@ public enum AtlasErrorCode {
     FAILED_TO_OBTAIN_GREMLIN_SCRIPT_ENGINE(500, "ATLAS-500-00-008", "Failed to obtain gremlin script engine: {0}"),
     JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS-500-00-009", "ObjectMapper.readValue returned NULL for class: {0}"),
     GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS-500-00-00A", "Gremlin script execution failed: {0}"),
-
     CURATOR_FRAMEWORK_UPDATE(500, "ATLAS-500-00-00B", "ActiveInstanceState.update resulted in exception."),
     QUICK_START(500, "ATLAS-500-00-00C", "Failed to run QuickStart: {0}"),
     EMBEDDED_SERVER_START(500, "ATLAS-500-00-00D", "EmbeddedServer.Start: failed!"),
@@ -165,9 +169,9 @@ public enum AtlasErrorCode {
     SQOOP_HOOK(500, "ATLAS-500-00-00F", "SqoopHook: {0}"),
     HIVE_HOOK(500, "ATLAS-500-00-010", "HiveHook: {0}"),
     HIVE_HOOK_METASTORE_BRIDGE(500, "ATLAS-500-00-011", "HiveHookMetaStoreBridge: {0}"),
-
-    DATA_ACCESS_SAVE_FAILED(500, "ATLAS-500-00-00B", "Save failed: {0}"),
-    DATA_ACCESS_LOAD_FAILED(500, "ATLAS-500-00-00C", "Load failed: {0}");
+    DATA_ACCESS_SAVE_FAILED(500, "ATLAS-500-00-012", "Save failed: {0}"),
+    DATA_ACCESS_LOAD_FAILED(500, "ATLAS-500-00-013", "Load failed: {0}"),
+    ENTITY_NOTIFICATION_FAILED(500, "ATLAS-500-00-014", "Notification failed for operation: {} : {}");
 
     private String errorCode;
     private String errorMessage;

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
new file mode 100644
index 0000000..70877d2
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.listener;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+
+import java.util.List;
+
+/**
+ * Entity change notification listener V2.
+ */
+public interface EntityChangeListenerV2 {
+    /**
+     * This is upon adding new entities to the repository.
+     *
+     * @param entities the created entities
+     * @param isImport
+     */
+    void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException;
+
+    /**
+     * This is upon updating an entity.
+     *
+     * @param entities the updated entities
+     * @param isImport
+     */
+    void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException;
+
+    /**
+     * This is upon deleting entities from the repository.
+     *
+     * @param entities the deleted entities
+     * @param isImport
+     */
+    void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException;
+
+    /**
+     * This is upon adding new classifications to an entity.
+     *
+     * @param entity          the entity
+     * @param classifications classifications that needs to be added to an entity
+     * @throws AtlasBaseException if the listener notification fails
+     */
+    void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
+
+    /**
+     * This is upon updating classifications to an entity.
+     *
+     * @param entity          the entity
+     * @param classifications classifications that needs to be updated for an entity
+     * @throws AtlasBaseException if the listener notification fails
+     */
+    void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
+
+    /**
+     * This is upon deleting classifications from an entity.
+     *
+     * @param entity              the entity
+     * @param classificationNames classifications names for the instance that needs to be deleted from entity
+     * @throws AtlasBaseException if the listener notification fails
+     */
+    void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
new file mode 100644
index 0000000..741e371
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.audit;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.type.AtlasType;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.Objects;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Structure of v2 entity audit event
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class EntityAuditEventV2 implements Serializable {
+    public enum EntityAuditAction {
+        ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE,
+        ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
+        CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
+    }
+
+    private String            entityId;
+    private long              timestamp;
+    private String            user;
+    private EntityAuditAction action;
+    private String            details;
+    private String            eventKey;
+    private AtlasEntity       entity;
+
+    public EntityAuditEventV2() { }
+
+    public EntityAuditEventV2(String entityId, long timestamp, String user, EntityAuditAction action, String details,
+                              AtlasEntity entity) {
+        setEntityId(entityId);
+        setTimestamp(timestamp);
+        setUser(user);
+        setAction(action);
+        setDetails(details);
+        setEntity(entity);
+    }
+
+    public String getEntityId() {
+        return entityId;
+    }
+
+    public void setEntityId(String entityId) {
+        this.entityId = entityId;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public EntityAuditAction getAction() {
+        return action;
+    }
+
+    public void setAction(EntityAuditAction action) {
+        this.action = action;
+    }
+
+    public String getDetails() {
+        return details;
+    }
+
+    public void setDetails(String details) {
+        this.details = details;
+    }
+
+    public String getEventKey() {
+        return eventKey;
+    }
+
+    public void setEventKey(String eventKey) {
+        this.eventKey = eventKey;
+    }
+
+    public AtlasEntity getEntity() {
+        return entity;
+    }
+
+    public void setEntity(AtlasEntity entity) {
+        this.entity = entity;
+    }
+
+    @JsonIgnore
+    public String getEntityDefinitionString() {
+        if (entity != null) {
+            return AtlasType.toJson(entity);
+        }
+
+        return null;
+    }
+
+    @JsonIgnore
+    public void setEntityDefinition(String entityDefinition) {
+        this.entity = AtlasType.fromJson(entityDefinition, AtlasEntity.class);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) { return true; }
+        if (o == null || getClass() != o.getClass()) { return false; }
+        EntityAuditEventV2 that = (EntityAuditEventV2) o;
+
+        return timestamp == that.timestamp &&
+               Objects.equals(entityId, that.entityId) &&
+               Objects.equals(user, that.user) &&
+               action == that.action &&
+               Objects.equals(details, that.details) &&
+               Objects.equals(eventKey, that.eventKey) &&
+               Objects.equals(entity, that.entity);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entityId, timestamp, user, action, details, eventKey, entity);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("EntityAuditEventV2{");
+        sb.append("entityId='").append(entityId).append('\'');
+        sb.append(", timestamp=").append(timestamp);
+        sb.append(", user='").append(user).append('\'');
+        sb.append(", action=").append(action);
+        sb.append(", details='").append(details).append('\'');
+        sb.append(", eventKey='").append(eventKey).append('\'');
+        sb.append(", entity=").append(entity);
+        sb.append('}');
+
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
index f594a81..a499f79 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -49,6 +50,9 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 public class AtlasClassification extends AtlasStruct implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    private String  entityGuid = null;
+    private boolean propagate  = true;
+
     public AtlasClassification() {
         this(null, null);
     }
@@ -76,6 +80,47 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
         }
     }
 
+    public String getEntityGuid() {
+        return entityGuid;
+    }
+
+    public void setEntityGuid(String entityGuid) {
+        this.entityGuid = entityGuid;
+    }
+
+    public boolean isPropagate() {
+        return propagate;
+    }
+
+    public void setPropagate(boolean propagate) {
+        this.propagate = propagate;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) { return true; }
+        if (o == null || getClass() != o.getClass()) { return false; }
+        if (!super.equals(o)) { return false; }
+        AtlasClassification that = (AtlasClassification) o;
+        return propagate == that.propagate &&
+               Objects.equals(entityGuid, that.entityGuid);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), entityGuid, propagate);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("AtlasClassification{");
+        super.toString(sb);
+        sb.append("entityGuid='").append(entityGuid).append('\'');
+        sb.append(", propagate=").append(propagate);
+        sb.append('}');
+        return sb.toString();
+    }
+
     /**
      * REST serialization friendly list.
      */

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 08d1ce1..fce46da 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -259,6 +259,17 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
 
     public void setClassifications(List<AtlasClassification> classifications) { this.classifications = classifications; }
 
+    public void addClassifications(List<AtlasClassification> classifications) {
+        List<AtlasClassification> c = this.classifications;
+
+        if (c == null) {
+            c = new ArrayList<>(classifications);
+
+            this.classifications = c;
+        }
+
+        c.addAll(classifications);
+    }
 
     private void init() {
         setGuid(nextInternalId());

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
index b272b73..f70eb3f 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
@@ -44,7 +44,7 @@ public class EntityNotification implements Serializable {
      * Type of the hook message.
      */
     public enum EntityNotificationType {
-        ENTITY_NOTIFICATION_V1
+        ENTITY_NOTIFICATION_V1, ENTITY_NOTIFICATION_V2
     }
 
     protected EntityNotificationType type;

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java
new file mode 100644
index 0000000..a8dfd23
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.v1.model.notification;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+import static org.apache.atlas.model.notification.EntityNotification.EntityNotificationType.ENTITY_NOTIFICATION_V2;
+
+/**
+ * Entity v2 notification
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class EntityNotificationV2 extends EntityNotification implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public enum OperationType {
+        ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE,
+        CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE
+    }
+
+    private AtlasEntity               entity;
+    private OperationType             operationType;
+    private List<AtlasClassification> classifications;
+
+    public EntityNotificationV2() { }
+
+    public EntityNotificationV2(AtlasEntity entity, OperationType operationType, List<AtlasClassification> classifications) {
+        setEntity(entity);
+        setOperationType(operationType);
+        setClassifications(classifications);
+        setType(ENTITY_NOTIFICATION_V2);
+    }
+
+    public AtlasEntity getEntity() {
+        return entity;
+    }
+
+    public void setEntity(AtlasEntity entity) {
+        this.entity = entity;
+    }
+
+    public OperationType getOperationType() {
+        return operationType;
+    }
+
+    public void setOperationType(OperationType operationType) {
+        this.operationType = operationType;
+    }
+
+    public List<AtlasClassification> getClassifications() {
+        return classifications;
+    }
+
+    public void setClassifications(List<AtlasClassification> classifications) {
+        this.classifications = classifications;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) { return true; }
+        if (o == null || getClass() != o.getClass()) { return false; }
+        EntityNotificationV2 that = (EntityNotificationV2) o;
+        return Objects.equals(entity, that.entity) &&
+               operationType == that.operationType &&
+               Objects.equals(classifications, that.classifications);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entity, operationType, classifications);
+    }
+
+    @Override
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("EntityNotificationV1{");
+        super.toString(sb);
+        sb.append(", entity=");
+        if (entity != null) {
+            entity.toString(sb);
+        } else {
+            sb.append(entity);
+        }
+        sb.append(", operationType=").append(operationType);
+        sb.append(", classifications=[");
+        AtlasBaseTypeDef.dumpObjects(classifications, sb);
+        sb.append("]");
+        sb.append("}");
+
+        return sb;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
index 74d3b91..1c04eea 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -67,7 +67,7 @@ public class EntityAuditListener implements EntityChangeListener {
             events.add(event);
         }
 
-        auditRepository.putEvents(events);
+        auditRepository.putEventsV1(events);
     }
 
     @Override
@@ -78,7 +78,7 @@ public class EntityAuditListener implements EntityChangeListener {
             events.add(event);
         }
 
-        auditRepository.putEvents(events);
+        auditRepository.putEventsV1(events);
     }
 
     @Override
@@ -88,7 +88,7 @@ public class EntityAuditListener implements EntityChangeListener {
                 EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD,
                                                      "Added trait: " + AtlasType.toV1Json(trait));
 
-                auditRepository.putEvents(event);
+                auditRepository.putEventsV1(event);
             }
         }
     }
@@ -99,7 +99,7 @@ public class EntityAuditListener implements EntityChangeListener {
             for (String traitName : traitNames) {
                 EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
 
-                auditRepository.putEvents(event);
+                auditRepository.putEventsV1(event);
             }
         }
     }
@@ -111,7 +111,7 @@ public class EntityAuditListener implements EntityChangeListener {
                 EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_UPDATE,
                                                      "Updated trait: " + AtlasType.toV1Json(trait));
 
-                auditRepository.putEvents(event);
+                auditRepository.putEventsV1(event);
             }
         }
     }
@@ -124,11 +124,11 @@ public class EntityAuditListener implements EntityChangeListener {
             events.add(event);
         }
 
-        auditRepository.putEvents(events);
+        auditRepository.putEventsV1(events);
     }
 
     public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{
-        return auditRepository.listEvents(guid, null, (short) 10);
+        return auditRepository.listEventsV1(guid, null, (short) 10);
     }
 
     private EntityAuditEvent createEvent(Referenceable entity, EntityAuditAction action)

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
new file mode 100644
index 0000000..bb51014
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.listener.EntityChangeListenerV2;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_ADD;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_DELETE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_CREATE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_DELETE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_UPDATE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE;
+
+@Component
+public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListenerV2.class);
+
+    private final EntityAuditRepository auditRepository;
+    private final AtlasTypeRegistry     typeRegistry;
+
+    @Inject
+    public EntityAuditListenerV2(EntityAuditRepository auditRepository, AtlasTypeRegistry typeRegistry) {
+        this.auditRepository = auditRepository;
+        this.typeRegistry    = typeRegistry;
+    }
+
+    @Override
+    public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        List<EntityAuditEventV2> events = new ArrayList<>();
+
+        for (AtlasEntity entity : entities) {
+            EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_CREATE : ENTITY_CREATE);
+
+            events.add(event);
+        }
+
+        auditRepository.putEventsV2(events);
+    }
+
+    @Override
+    public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        List<EntityAuditEventV2> events = new ArrayList<>();
+
+        for (AtlasEntity entity : entities) {
+            EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
+
+            events.add(event);
+        }
+
+        auditRepository.putEventsV2(events);
+    }
+
+    @Override
+    public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        List<EntityAuditEventV2> events = new ArrayList<>();
+
+        for (AtlasEntity entity : entities) {
+            EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
+
+            events.add(event);
+        }
+
+        auditRepository.putEventsV2(events);
+    }
+
+    @Override
+    public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            List<EntityAuditEventV2> events = new ArrayList<>();
+
+            for (AtlasClassification classification : classifications) {
+                events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)));
+            }
+
+            auditRepository.putEventsV2(events);
+        }
+    }
+
+    @Override
+    public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            List<EntityAuditEventV2> events = new ArrayList<>();
+
+            for (AtlasClassification classification : classifications) {
+                events.add(createEvent(entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification)));
+            }
+
+            auditRepository.putEventsV2(events);
+        }
+    }
+
+    @Override
+    public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classificationNames)) {
+            List<EntityAuditEventV2> events = new ArrayList<>();
+
+            for (String classificationName : classificationNames) {
+                events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classificationName));
+            }
+
+            auditRepository.putEventsV2(events);
+        }
+    }
+
+    private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditAction action, String details) {
+        return new EntityAuditEventV2(entity.getGuid(), RequestContextV1.get().getRequestTime(),
+                                      RequestContextV1.get().getUser(), action, details, entity);
+    }
+
+    private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditAction action) {
+        String detail = getAuditEventDetail(entity, action);
+
+        return createEvent(entity, action, detail);
+    }
+
+    private String getAuditEventDetail(AtlasEntity entity, EntityAuditAction action) {
+        Map<String, Object> prunedAttributes = pruneEntityAttributesForAudit(entity);
+
+        String auditPrefix  = getAuditPrefix(action);
+        String auditString  = auditPrefix + AtlasType.toJson(entity);
+        byte[] auditBytes   = auditString.getBytes(StandardCharsets.UTF_8);
+        long   auditSize    = auditBytes != null ? auditBytes.length : 0;
+        long   auditMaxSize = auditRepository.repositoryMaxSize();
+
+        if (auditMaxSize >= 0 && auditSize > auditMaxSize) { // don't store attributes in audit
+            LOG.warn("audit record too long: entityType={}, guid={}, size={}; maxSize={}. entity attribute values not stored in audit",
+                    entity.getTypeName(), entity.getGuid(), auditSize, auditMaxSize);
+
+            Map<String, Object> attrValues = entity.getAttributes();
+
+            entity.setAttributes(null);
+
+            auditString = auditPrefix + AtlasType.toJson(entity);
+
+            entity.setAttributes(attrValues);
+        }
+
+        restoreEntityAttributes(entity, prunedAttributes);
+
+        return auditString;
+    }
+
+    private Map<String, Object> pruneEntityAttributesForAudit(AtlasEntity entity) {
+        Map<String, Object> ret               = null;
+        Map<String, Object> entityAttributes  = entity.getAttributes();
+        List<String>        excludeAttributes = auditRepository.getAuditExcludeAttributes(entity.getTypeName());
+        AtlasEntityType     entityType        = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (CollectionUtils.isNotEmpty(excludeAttributes) && MapUtils.isNotEmpty(entityAttributes) && entityType != null) {
+            for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
+                String attrName  = attribute.getName();
+                Object attrValue = entityAttributes.get(attrName);
+
+                if (excludeAttributes.contains(attrName)) {
+                    if (ret == null) {
+                        ret = new HashMap<>();
+                    }
+
+                    ret.put(attrName, attrValue);
+                    entityAttributes.remove(attrName);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private void restoreEntityAttributes(AtlasEntity entity, Map<String, Object> prunedAttributes) {
+        if (MapUtils.isEmpty(prunedAttributes)) {
+            return;
+        }
+
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (entityType != null && MapUtils.isNotEmpty(entityType.getAllAttributes())) {
+            for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
+                String attrName = attribute.getName();
+
+                if (prunedAttributes.containsKey(attrName)) {
+                    entity.setAttribute(attrName, prunedAttributes.get(attrName));
+                }
+            }
+        }
+    }
+
+    private String getAuditPrefix(EntityAuditAction action) {
+        final String ret;
+
+        switch (action) {
+            case ENTITY_CREATE:
+                ret = "Created: ";
+                break;
+            case ENTITY_UPDATE:
+                ret = "Updated: ";
+                break;
+            case ENTITY_DELETE:
+                ret = "Deleted: ";
+                break;
+            case CLASSIFICATION_ADD:
+                ret = "Added classification: ";
+                break;
+            case CLASSIFICATION_DELETE:
+                ret = "Deleted classification: ";
+                break;
+            case CLASSIFICATION_UPDATE:
+                ret = "Updated classification: ";
+                break;
+            case ENTITY_IMPORT_CREATE:
+                ret = "Created by import: ";
+                break;
+            case ENTITY_IMPORT_UPDATE:
+                ret = "Updated by import: ";
+                break;
+            case ENTITY_IMPORT_DELETE:
+                ret = "Deleted by import: ";
+                break;
+            default:
+                ret = "Unknown: ";
+        }
+
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index 9dc7835..aab2d5b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -20,6 +20,8 @@ package org.apache.atlas.repository.audit;
 
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.exception.AtlasBaseException;
 
 import java.util.List;
 
@@ -32,14 +34,14 @@ public interface EntityAuditRepository {
      * @param events events to be added
      * @throws AtlasException
      */
-    void putEvents(EntityAuditEvent... events) throws AtlasException;
+    void putEventsV1(EntityAuditEvent... events) throws AtlasException;
 
     /**
      * Add events to the event repository
      * @param events events to be added
      * @throws AtlasException
      */
-    void putEvents(List<EntityAuditEvent> events) throws AtlasException;
+    void putEventsV1(List<EntityAuditEvent> events) throws AtlasException;
 
     /**
      * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
@@ -49,13 +51,48 @@ public interface EntityAuditRepository {
      * @return list of events
      * @throws AtlasException
      */
-    List<EntityAuditEvent> listEvents(String entityId, String startKey, short n) throws AtlasException;
+    List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n) throws AtlasException;
+
+    /**
+     * Add v2 events to the event repository
+     * @param events events to be added
+     * @throws AtlasBaseException
+     */
+    void putEventsV2(EntityAuditEventV2... events) throws AtlasBaseException;
+
+    /**
+     * Add v2 events to the event repository
+     * @param events events to be added
+     * @throws AtlasBaseException
+     */
+    void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException;
+
+    /**
+     * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
+     * @param entityId entity id
+     * @param startKey key for the first event to be returned, used for pagination
+     * @param n        number of events to be returned
+     * @return list of events
+     * @throws AtlasBaseException
+     */
+    List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException;
+
+
+    /**
+     * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
+     * @param entityId entity id
+     * @param startKey key for the first event to be returned, used for pagination
+     * @param n        number of events to be returned
+     * @return list of events
+     * @throws AtlasBaseException
+     */
+    List<Object> listEvents(String entityId, String startKey, short n) throws AtlasBaseException;
 
     /**
      * Returns maximum allowed repository size per EntityAuditEvent
      * @throws AtlasException
      */
-    long repositoryMaxSize() throws AtlasException;
+    long repositoryMaxSize();
 
     /**
      * list of attributes to be excluded when storing in audit repo.
@@ -63,5 +100,5 @@ public interface EntityAuditRepository {
      * @return list of attribute names to be excluded
      * @throws AtlasException
      */
-    List<String> getAuditExcludeAttributes(String entityType) throws AtlasException;
+    List<String> getAuditExcludeAttributes(String entityType);
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 774934c..a22f421 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -23,9 +23,13 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction;
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.service.Service;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -116,17 +120,17 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
      * @throws AtlasException
      */
     @Override
-    public void putEvents(EntityAuditEvent... events) throws AtlasException {
-        putEvents(Arrays.asList(events));
+    public void putEventsV1(EntityAuditEvent... events) throws AtlasException {
+        putEventsV1(Arrays.asList(events));
     }
 
-    @Override
     /**
      * Add events to the event repository
      * @param events events to be added
      * @throws AtlasException
      */
-    public void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+    @Override
+    public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Putting {} events", events.size());
         }
@@ -154,6 +158,146 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
         }
     }
 
+    @Override
+    public void putEventsV2(EntityAuditEventV2... events) throws AtlasBaseException {
+        putEventsV2(Arrays.asList(events));
+    }
+
+    @Override
+    public void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Putting {} events", events.size());
+        }
+
+        Table table = null;
+
+        try {
+            table          = connection.getTable(tableName);
+            List<Put> puts = new ArrayList<>(events.size());
+
+            for (EntityAuditEventV2 event : events) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding entity audit event {}", event);
+                }
+
+                Put put = new Put(getKey(event.getEntityId(), event.getTimestamp()));
+
+                addColumn(put, COLUMN_ACTION, event.getAction());
+                addColumn(put, COLUMN_USER, event.getUser());
+                addColumn(put, COLUMN_DETAIL, event.getDetails());
+
+                if (persistEntityDefinition) {
+                    addColumn(put, COLUMN_DEFINITION, event.getEntity());
+                }
+
+                puts.add(put);
+            }
+
+            table.put(puts);
+        } catch (IOException e) {
+            throw new AtlasBaseException(e);
+        } finally {
+            try {
+                close(table);
+            } catch (AtlasException e) {
+                throw new AtlasBaseException(e);
+            }
+        }
+    }
+
+    @Override
+    public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n);
+        }
+
+        Table         table   = null;
+        ResultScanner scanner = null;
+
+        try {
+            table = connection.getTable(tableName);
+
+            /**
+             * Scan Details:
+             * In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first
+             * Page filter is set to limit the number of results returned.
+             * Stop row is set to the entity id to avoid going past the current entity while scanning
+             * small is set to true to optimise RPC calls as the scanner is created per request
+             */
+            Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n))
+                                  .setStopRow(Bytes.toBytes(entityId))
+                                  .setCaching(n)
+                                  .setSmall(true);
+
+            if (StringUtils.isEmpty(startKey)) {
+                //Set start row to entity id + max long value
+                byte[] entityBytes = getKey(entityId, Long.MAX_VALUE);
+                scan = scan.setStartRow(entityBytes);
+            } else {
+                scan = scan.setStartRow(Bytes.toBytes(startKey));
+            }
+
+            scanner = table.getScanner(scan);
+            List<EntityAuditEventV2> events = new ArrayList<>();
+
+            Result result;
+
+            //PageFilter doesn't ensure n results are returned. The filter is per region server.
+            //So, adding extra check on n here
+            while ((result = scanner.next()) != null && events.size() < n) {
+                EntityAuditEventV2 event = fromKeyV2(result.getRow());
+
+                //In case the user sets random start key, guarding against random events
+                if (!event.getEntityId().equals(entityId)) {
+                    continue;
+                }
+                event.setUser(getResultString(result, COLUMN_USER));
+                event.setAction(EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION)));
+                event.setDetails(getResultString(result, COLUMN_DETAIL));
+
+                if (persistEntityDefinition) {
+                    String colDef = getResultString(result, COLUMN_DEFINITION);
+
+                    if (colDef != null) {
+                        event.setEntityDefinition(colDef);
+                    }
+                }
+
+                events.add(event);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
+            }
+
+            return events;
+        } catch (IOException e) {
+            throw new AtlasBaseException(e);
+        } finally {
+            try {
+                close(scanner);
+                close(table);
+            } catch (AtlasException e) {
+                throw new AtlasBaseException(e);
+            }
+        }
+    }
+
+    @Override
+    public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
+        List ret = listEventsV2(entityId, startKey, maxResults);
+
+        try {
+            if (CollectionUtils.isEmpty(ret)) {
+                ret = listEventsV1(entityId, startKey, maxResults);
+            }
+        } catch (AtlasException e) {
+            throw new AtlasBaseException(e);
+        }
+
+        return ret;
+    }
+
     private <T> void addColumn(Put put, byte[] columnName, T columnValue) {
         if (columnValue != null && !columnValue.toString().isEmpty()) {
             put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString()));
@@ -175,7 +319,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
      * @return list of events
      * @throws AtlasException
      */
-    public List<EntityAuditEvent> listEvents(String entityId, String startKey, short n)
+    public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n)
             throws AtlasException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n);
@@ -243,7 +387,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
     }
 
     @Override
-    public long repositoryMaxSize() throws AtlasException {
+    public long repositoryMaxSize() {
         long ret;
         initApplicationProperties();
 
@@ -257,7 +401,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
     }
 
     @Override
-    public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException {
+    public List<String> getAuditExcludeAttributes(String entityType) {
         List<String> ret = null;
 
         initApplicationProperties();
@@ -308,6 +452,20 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
         return event;
     }
 
+    private EntityAuditEventV2 fromKeyV2(byte[] keyBytes) {
+        String             key   = Bytes.toString(keyBytes);
+        EntityAuditEventV2 event = new EntityAuditEventV2();
+
+        if (StringUtils.isNotEmpty(key)) {
+            String[] parts = key.split(FIELD_SEPARATOR);
+            event.setEntityId(parts[0]);
+            event.setTimestamp(Long.valueOf(parts[1]));
+            event.setEventKey(key);
+        }
+
+        return event;
+    }
+
     private void close(Closeable closeable) throws AtlasException {
         if (closeable != null) {
             try {

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
index 22d2a81..dca3b85 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -21,6 +21,8 @@ package org.apache.atlas.repository.audit;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.commons.collections.CollectionUtils;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Singleton;
@@ -37,15 +39,16 @@ import java.util.TreeMap;
 @Component
 @ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl")
 public class InMemoryEntityAuditRepository implements EntityAuditRepository {
-    private TreeMap<String, EntityAuditEvent> auditEvents = new TreeMap<>();
+    private TreeMap<String, EntityAuditEvent>   auditEvents   = new TreeMap<>();
+    private TreeMap<String, EntityAuditEventV2> auditEventsV2 = new TreeMap<>();
 
     @Override
-    public void putEvents(EntityAuditEvent... events) throws AtlasException {
-        putEvents(Arrays.asList(events));
+    public void putEventsV1(EntityAuditEvent... events) throws AtlasException {
+        putEventsV1(Arrays.asList(events));
     }
 
     @Override
-    public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+    public synchronized void putEventsV1(List<EntityAuditEvent> events) throws AtlasException {
         for (EntityAuditEvent event : events) {
             String rowKey = event.getEntityId() + (Long.MAX_VALUE - event.getTimestamp());
             event.setEventKey(rowKey);
@@ -56,8 +59,7 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
     //synchronized to avoid concurrent modification exception that occurs if events are added
     //while we are iterating through the map
     @Override
-    public synchronized List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults)
-            throws AtlasException {
+    public synchronized List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short maxResults) {
         List<EntityAuditEvent> events = new ArrayList<>();
         String myStartKey = startKey;
         if (myStartKey == null) {
@@ -73,12 +75,57 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
     }
 
     @Override
-    public long repositoryMaxSize() throws AtlasException {
+    public long repositoryMaxSize() {
         return -1;
     }
 
     @Override
-    public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException {
+    public List<String> getAuditExcludeAttributes(String entityType) {
         return null;
     }
+
+    @Override
+    public void putEventsV2(EntityAuditEventV2... events) {
+        putEventsV2(Arrays.asList(events));
+    }
+
+    @Override
+    public void putEventsV2(List<EntityAuditEventV2> events) {
+        for (EntityAuditEventV2 event : events) {
+            String rowKey = event.getEntityId() + (Long.MAX_VALUE - event.getTimestamp());
+            event.setEventKey(rowKey);
+            auditEventsV2.put(rowKey, event);
+        }
+    }
+
+    @Override
+    public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short maxResults) {
+        List<EntityAuditEventV2> events     = new ArrayList<>();
+        String                   myStartKey = startKey;
+
+        if (myStartKey == null) {
+            myStartKey = entityId;
+        }
+
+        SortedMap<String, EntityAuditEventV2> subMap = auditEventsV2.tailMap(myStartKey);
+
+        for (EntityAuditEventV2 event : subMap.values()) {
+            if (events.size() < maxResults && event.getEntityId().equals(entityId)) {
+                events.add(event);
+            }
+        }
+
+        return events;
+    }
+
+    @Override
+    public List<Object> listEvents(String entityId, String startKey, short maxResults) {
+        List events = listEventsV2(entityId, startKey, maxResults);
+
+        if (CollectionUtils.isEmpty(events)) {
+            events = listEventsV1(entityId, startKey, maxResults);
+        }
+
+        return events;
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
index c382601..e3a6078 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
@@ -18,9 +18,9 @@
 
 package org.apache.atlas.repository.audit;
 
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Singleton;
@@ -36,28 +36,47 @@ import java.util.List;
 public class NoopEntityAuditRepository implements EntityAuditRepository {
 
     @Override
-    public void putEvents(EntityAuditEvent... events) throws AtlasException {
+    public void putEventsV1(EntityAuditEvent... events) {
         //do nothing
     }
 
     @Override
-    public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+    public synchronized void putEventsV1(List<EntityAuditEvent> events) {
         //do nothing
     }
 
     @Override
-    public List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults)
-            throws AtlasException {
+    public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short maxResults) {
         return Collections.emptyList();
     }
 
     @Override
-    public long repositoryMaxSize() throws AtlasException {
+    public void putEventsV2(EntityAuditEventV2... events) {
+        //do nothing
+    }
+
+    @Override
+    public void putEventsV2(List<EntityAuditEventV2> events) {
+        //do nothing
+    }
+
+    @Override
+    public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Object> listEvents(String entityId, String startKey, short n) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public long repositoryMaxSize() {
         return -1;
     }
 
     @Override
-    public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException {
+    public List<String> getAuditExcludeAttributes(String entityType) {
         return null;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 2884f8f..f9598eb 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -20,7 +20,9 @@ package org.apache.atlas.repository.converters;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.CreateUpdateEntitiesResult;
+import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
@@ -54,6 +56,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD;
+import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE;
+import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_UPDATE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE;
+import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_ADD;
+
 @Singleton
 @Component
 public class AtlasInstanceConverter {
@@ -290,7 +299,7 @@ public class AtlasInstanceConverter {
     }
 
 
-    private AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntity(String guid) throws AtlasBaseException {
+    public AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntity(String guid) throws AtlasBaseException {
         RequestContextV1                   context           = RequestContextV1.get();
         AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = context.getInstanceV2(guid);
 
@@ -308,4 +317,93 @@ public class AtlasInstanceConverter {
 
         return entityWithExtInfo;
     }
-}
+
+    public EntityAuditEvent toV1AuditEvent(EntityAuditEventV2 v2Event) throws AtlasBaseException {
+        EntityAuditEvent ret = new EntityAuditEvent();
+
+        ret.setEntityId(v2Event.getEntityId());
+        ret.setTimestamp(v2Event.getTimestamp());
+        ret.setUser(v2Event.getUser());
+        ret.setDetails(v2Event.getDetails());
+        ret.setEventKey(v2Event.getEventKey());
+
+        ret.setAction(getV1AuditAction(v2Event.getAction()));
+        ret.setEntityDefinition(getReferenceable(v2Event.getEntityId()));
+
+        return ret;
+    }
+
+    public EntityAuditEventV2 toV2AuditEvent(EntityAuditEvent v1Event) throws AtlasBaseException {
+        EntityAuditEventV2 ret = new EntityAuditEventV2();
+
+        ret.setEntityId(v1Event.getEntityId());
+        ret.setTimestamp(v1Event.getTimestamp());
+        ret.setUser(v1Event.getUser());
+        ret.setDetails(v1Event.getDetails());
+        ret.setEventKey(v1Event.getEventKey());
+        ret.setAction(getV2AuditAction(v1Event.getAction()));
+
+        AtlasEntitiesWithExtInfo entitiesWithExtInfo = toAtlasEntity(v1Event.getEntityDefinition());
+
+        if (entitiesWithExtInfo != null && CollectionUtils.isNotEmpty(entitiesWithExtInfo.getEntities())) {
+            // there will only one source entity
+            AtlasEntity entity = entitiesWithExtInfo.getEntities().get(0);
+
+            ret.setEntity(entity);
+        }
+
+        return ret;
+    }
+
+    private EntityAuditEvent.EntityAuditAction getV1AuditAction(EntityAuditEventV2.EntityAuditAction v2AuditAction) {
+        EntityAuditEvent.EntityAuditAction ret = null;
+
+        switch (v2AuditAction) {
+            case ENTITY_CREATE:
+            case ENTITY_UPDATE:
+            case ENTITY_DELETE:
+            case ENTITY_IMPORT_CREATE:
+            case ENTITY_IMPORT_UPDATE:
+            case ENTITY_IMPORT_DELETE:
+                ret = EntityAuditEvent.EntityAuditAction.valueOf(v2AuditAction.name());
+                break;
+            case CLASSIFICATION_ADD:
+                ret = EntityAuditEvent.EntityAuditAction.valueOf(TAG_ADD.name());
+                break;
+            case CLASSIFICATION_DELETE:
+                ret = EntityAuditEvent.EntityAuditAction.valueOf(TAG_DELETE.name());
+                break;
+            case CLASSIFICATION_UPDATE:
+                ret = EntityAuditEvent.EntityAuditAction.valueOf(TAG_UPDATE.name());
+                break;
+        }
+
+        return ret;
+    }
+
+    private EntityAuditEventV2.EntityAuditAction getV2AuditAction(EntityAuditEvent.EntityAuditAction v1AuditAction) {
+        EntityAuditEventV2.EntityAuditAction ret = null;
+
+        switch (v1AuditAction) {
+            case ENTITY_CREATE:
+            case ENTITY_UPDATE:
+            case ENTITY_DELETE:
+            case ENTITY_IMPORT_CREATE:
+            case ENTITY_IMPORT_UPDATE:
+            case ENTITY_IMPORT_DELETE:
+                ret = EntityAuditEventV2.EntityAuditAction.valueOf(v1AuditAction.name());
+                break;
+            case TAG_ADD:
+                ret = EntityAuditEventV2.EntityAuditAction.valueOf(CLASSIFICATION_ADD.name());
+                break;
+            case TAG_DELETE:
+                ret = EntityAuditEventV2.EntityAuditAction.valueOf(CLASSIFICATION_DELETE.name());
+                break;
+            case TAG_UPDATE:
+                ret = EntityAuditEventV2.EntityAuditAction.valueOf(CLASSIFICATION_UPDATE.name());
+                break;
+        }
+
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 31620b1..e609366 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -74,6 +74,7 @@ import static org.apache.atlas.repository.Constants.FULLTEXT_INDEX;
 import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
+import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
@@ -275,6 +276,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             createVertexIndex(management, ENTITY_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);
             createVertexIndex(management, SUPER_TYPES_PROPERTY_KEY, String.class, false, SET, true, true);
             createVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, String.class, false, SET, true, true);
+            createVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class, false, LIST, true, true);
             createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, true);
             createVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);
 


[2/3] atlas git commit: ATLAS-2456: Implement tag propagation using relationships

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index d61bff2..9d56fa9 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -26,10 +26,12 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -55,6 +57,9 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.script.Bindings;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -68,6 +73,14 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL;
 
 /**
  * Utility class for graph operations.
@@ -80,6 +93,8 @@ public final class GraphHelper {
     public static final String RETRY_COUNT = "atlas.graph.storage.num.retries";
     public static final String RETRY_DELAY = "atlas.graph.storage.retry.sleeptime.ms";
 
+    private final AtlasGremlinQueryProvider queryProvider = AtlasGremlinQueryProvider.INSTANCE;
+
     private static volatile GraphHelper INSTANCE;
 
     private AtlasGraph graph;
@@ -166,7 +181,7 @@ public final class GraphHelper {
         return vertexWithoutIdentity;
     }
 
-    private AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) {
+    public AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding edge for {} -> label {} -> {}", string(fromVertex), edgeLabel, string(toVertex));
         }
@@ -266,6 +281,26 @@ public final class GraphHelper {
         return (AtlasEdge) findElement(false, args);
     }
 
+    public static boolean edgeExists(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) {
+        boolean             ret   = false;
+        Iterator<AtlasEdge> edges = getAdjacentEdgesByLabel(inVertex, AtlasEdgeDirection.IN, edgeLabel);
+
+        while (edges != null && edges.hasNext()) {
+            AtlasEdge edge = edges.next();
+
+            if (edge.getOutVertex().equals(outVertex)) {
+                Status edgeState = getStatus(edge);
+
+                if (edgeState == null || edgeState == ACTIVE) {
+                    ret = true;
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
     private AtlasElement findElement(boolean isVertexSearch, Object... args) throws EntityNotFoundException {
         AtlasGraphQuery query = graph.query();
 
@@ -522,7 +557,7 @@ public final class GraphHelper {
     }
 
     /**
-     * Adds an additional value to a multi-property.
+     * Adds an additional value to a multi-property (SET).
      *
      * @param vertex
      * @param propertyName
@@ -539,6 +574,23 @@ public final class GraphHelper {
     }
 
     /**
+     * Adds an additional value to a multi-property (LIST).
+     *
+     * @param vertex
+     * @param propertyName
+     * @param value
+     */
+    public static void addListProperty(AtlasVertex vertex, String propertyName, Object value) {
+        String actualPropertyName = GraphHelper.encodePropertyKey(propertyName);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding property {} = \"{}\" to vertex {}", actualPropertyName, value, string(vertex));
+        }
+
+        vertex.addListProperty(actualPropertyName, value);
+    }
+
+    /**
      * Remove the specified edge from the graph.
      *
      * @param edge
@@ -635,6 +687,73 @@ public final class GraphHelper {
         return result;
     }
 
+    public List<AtlasVertex> getIncludedImpactedVerticesWithReferences(AtlasVertex entityVertex, String relationshipGuid) throws AtlasBaseException {
+        List<AtlasVertex> ret              = new ArrayList<>();
+        List<AtlasVertex> impactedVertices = getImpactedVerticesWithReferences(getGuid(entityVertex), relationshipGuid);
+
+        ret.add(entityVertex);
+
+        if (CollectionUtils.isNotEmpty(impactedVertices)) {
+            ret.addAll(impactedVertices);
+        }
+
+        return ret;
+    }
+
+    public List<AtlasVertex> getImpactedVertices(String guid) throws AtlasBaseException {
+        ScriptEngine      scriptEngine = graph.getGremlinScriptEngine();
+        Bindings          bindings     = scriptEngine.createBindings();
+        String            query        = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES);
+        List<AtlasVertex> ret          = new ArrayList<>();
+
+        bindings.put("g", graph);
+        bindings.put("guid", guid);
+
+        try {
+            Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false);
+
+            if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) {
+                List<?> results = (List) resultObj;
+                Object firstElement = results.get(0);
+
+                if (firstElement instanceof AtlasVertex) {
+                    ret = (List<AtlasVertex>) results;
+                }
+            }
+        } catch (ScriptException e) {
+            throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
+        }
+
+        return ret;
+    }
+
+    public List<AtlasVertex> getImpactedVerticesWithReferences(String guid, String relationshipGuid) throws AtlasBaseException {
+        ScriptEngine      scriptEngine = graph.getGremlinScriptEngine();
+        Bindings          bindings     = scriptEngine.createBindings();
+        String            query        = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL);
+        List<AtlasVertex> ret          = new ArrayList<>();
+
+        bindings.put("g", graph);
+        bindings.put("guid", guid);
+        bindings.put("relationshipGuid", relationshipGuid);
+
+        try {
+            Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false);
+
+            if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) {
+                List<?> results = (List) resultObj;
+                Object firstElement = results.get(0);
+
+                if (firstElement instanceof AtlasVertex) {
+                    ret = (List<AtlasVertex>) results;
+                }
+            }
+        } catch (ScriptException e) {
+            throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
+        }
+
+        return ret;
+    }
 
     /**
      * Finds the Vertices that correspond to the given GUIDs.  GUIDs
@@ -655,13 +774,60 @@ public final class GraphHelper {
         return attrName;
     }
 
+    public static String getTraitLabel(String traitName) {
+        return traitName;
+    }
+
+    public static String getPropagatedEdgeLabel(String classificationName) {
+        return "propagated:" + classificationName;
+    }
+
+    public static List<String> getAllTraitNames(AtlasVertex<?, ?> entityVertex) {
+        ArrayList<String> ret = new ArrayList<>();
+
+        if (entityVertex != null) {
+            Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
+
+            traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
+        }
+
+        return ret;
+    }
+
     public static List<String> getTraitNames(AtlasVertex<?,?> entityVertex) {
-        ArrayList<String> traits = new ArrayList<>();
-        Collection<String> propertyValues = entityVertex.getPropertyValues(Constants.TRAIT_NAMES_PROPERTY_KEY, String.class);
-        for(String value : propertyValues) {
-            traits.add(value);
+        ArrayList<String> ret = new ArrayList<>();
+
+        if (entityVertex != null) {
+            Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
         }
-        return traits;
+
+        return ret;
+    }
+
+    public static List<String> getPropagatedTraitNames(AtlasVertex<?,?> entityVertex) {
+        ArrayList<String> ret = new ArrayList<>();
+
+        if (entityVertex != null) {
+            Collection<String> traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
+        }
+
+        return ret;
     }
 
     public static List<String> getSuperTypeNames(AtlasVertex<?,?> entityVertex) {
@@ -691,6 +857,10 @@ public final class GraphHelper {
         return getIdFromVertex(getTypeName(vertex), vertex);
     }
 
+    public static String getRelationshipGuid(AtlasElement element) {
+        return element.getProperty(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
+    }
+
     public static String getGuid(AtlasElement element) {
         return element.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class);
     }
@@ -743,6 +913,25 @@ public final class GraphHelper {
         return element.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
     }
 
+    public List<AtlasVertex> getPropagatedEntityVerticesFromClassification(AtlasVertex classificationVertex) {
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        if (classificationVertex != null) {
+            String              classificationName = getTypeName(classificationVertex);
+            Iterator<AtlasEdge> iterator           = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName));
+
+            while (iterator != null && iterator.hasNext()) {
+                AtlasEdge propagatedEdge = iterator.next();
+
+                if (propagatedEdge != null) {
+                    ret.add(propagatedEdge.getOutVertex());
+                }
+            }
+        }
+
+        return ret;
+    }
+
     /**
      * For the given type, finds an unique attribute and checks if there is an existing instance with the same
      * unique value
@@ -881,6 +1070,41 @@ public final class GraphHelper {
         return Collections.emptyList();
     }
 
+    public static List<String> getTypeNames(List<AtlasVertex> vertices) {
+        List<String> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(vertices)) {
+            for (AtlasVertex vertex : vertices) {
+                String entityTypeProperty = vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
+
+                if (entityTypeProperty != null) {
+                    ret.add(getTypeName(vertex));
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    public static AtlasVertex getAssociatedEntityVertex(AtlasVertex classificationVertex) {
+        AtlasVertex ret = null;
+
+        if (classificationVertex != null) {
+            Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getTypeName(classificationVertex));
+
+            while (iterator != null && iterator.hasNext()) {
+                AtlasEdge edge = iterator.next();
+
+                if (edge != null) {
+                    ret = edge.getOutVertex();
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
     /**
      * Guid and AtlasVertex combo
      */
@@ -919,7 +1143,8 @@ public final class GraphHelper {
         }
     }
 
-    /**
+    /*
+     /**
      * Get the GUIDs and vertices for all composite entities owned/contained by the specified root entity AtlasVertex.
      * The graph is traversed from the root entity through to the leaf nodes of the containment graph.
      *
@@ -1299,4 +1524,51 @@ public final class GraphHelper {
 
         return StringUtils.isNotEmpty(edge.getLabel()) ? edgeLabel.startsWith("r:") : false;
     }
+
+    public static AtlasObjectId getReferenceObjectId(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection,
+                                                     AtlasVertex parentVertex) {
+        AtlasObjectId ret = null;
+
+        if (relationshipDirection == OUT) {
+            ret = getAtlasObjectIdForInVertex(edge);
+        } else if (relationshipDirection == IN) {
+            ret = getAtlasObjectIdForOutVertex(edge);
+        } else if (relationshipDirection == BOTH){
+            // since relationship direction is BOTH, edge direction can be inward or outward
+            // compare with parent entity vertex and pick the right reference vertex
+            if (verticesEquals(parentVertex, edge.getOutVertex())) {
+                ret = getAtlasObjectIdForInVertex(edge);
+            } else {
+                ret = getAtlasObjectIdForOutVertex(edge);
+            }
+        }
+
+        return ret;
+    }
+
+    public static AtlasObjectId getAtlasObjectIdForOutVertex(AtlasEdge edge) {
+        return new AtlasObjectId(getGuid(edge.getOutVertex()), getTypeName(edge.getOutVertex()));
+    }
+
+    public static AtlasObjectId getAtlasObjectIdForInVertex(AtlasEdge edge) {
+        return new AtlasObjectId(getGuid(edge.getInVertex()), getTypeName(edge.getInVertex()));
+    }
+
+    private static boolean verticesEquals(AtlasVertex vertexA, AtlasVertex vertexB) {
+        return StringUtils.equals(getGuid(vertexB), getGuid(vertexA));
+    }
+
+    public static void removePropagatedTraitNameFromVertex(AtlasVertex entityVertex, String propagatedTraitName) {
+        List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex);
+
+        if (CollectionUtils.isNotEmpty(propagatedTraitNames) && propagatedTraitNames.contains(propagatedTraitName)) {
+            propagatedTraitNames.remove(propagatedTraitName);
+
+            entityVertex.removeProperty(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY);
+
+            for (String pTraitName : propagatedTraitNames) {
+                GraphHelper.addListProperty(entityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, pTraitName);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index 2b6bead..b9945d4 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -20,13 +20,14 @@ package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.listener.EntityChangeListenerV2;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
+
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
 import org.apache.atlas.v1.model.instance.Referenceable;
@@ -49,22 +50,26 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled;
+
 
 @Component
 public class AtlasEntityChangeNotifier {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
 
-    private final Set<EntityChangeListener> entityChangeListeners;
-    private final AtlasInstanceConverter    instanceConverter;
+    private final Set<EntityChangeListener>   entityChangeListeners;
+    private final Set<EntityChangeListenerV2> entityChangeListenersV2;
+    private final AtlasInstanceConverter      instanceConverter;
 
     @Inject
     private FullTextMapperV2 fullTextMapperV2;
 
     @Inject
-    public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners,
-                                     AtlasInstanceConverter    instanceConverter) {
-        this.entityChangeListeners = entityChangeListeners;
-        this.instanceConverter     = instanceConverter;
+    public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners, Set<EntityChangeListenerV2> entityChangeListenersV2,
+                                     AtlasInstanceConverter instanceConverter) {
+        this.entityChangeListeners   = entityChangeListeners;
+        this.entityChangeListenersV2 = entityChangeListenersV2;
+        this.instanceConverter       = instanceConverter;
     }
 
     public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
@@ -89,63 +94,84 @@ public class AtlasEntityChangeNotifier {
         notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
     }
 
-    public void onClassificationAddedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException {
-        // Only new classifications need to be used for a partial full text string which can be
-        // appended to the existing fullText
-        updateFullTextMapping(entityId, classifications);
+    public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
 
-        Referenceable entity = toReferenceable(entityId);
-        List<Struct>  traits = toStruct(classifications);
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsAdded(entity, addedClassifications);
+            }
+        } else {
+            updateFullTextMapping(entity.getGuid(), addedClassifications);
 
-        if (entity == null || CollectionUtils.isEmpty(traits)) {
-            return;
-        }
+            Referenceable entityRef = toReferenceable(entity.getGuid());
+            List<Struct>  traits    = toStruct(addedClassifications);
 
-        for (EntityChangeListener listener : entityChangeListeners) {
-            try {
-                listener.onTraitsAdded(entity, traits);
-            } catch (AtlasException e) {
-                throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
+            if (entity == null || CollectionUtils.isEmpty(traits)) {
+                return;
+            }
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsAdded(entityRef, traits);
+                } catch (AtlasException e) {
+                    throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
+                }
             }
         }
     }
 
-    public void onClassificationDeletedFromEntity(String entityId, List<String> traitNames) throws AtlasBaseException {
-        // Since the entity has already been modified in the graph, we need to recursively remap the entity
-        doFullTextMapping(entityId);
+    public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
 
-        Referenceable entity = toReferenceable(entityId);
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsUpdated(entity, updatedClassifications);
+            }
+        } else {
+            doFullTextMapping(entity.getGuid());
 
-        if (entity == null || CollectionUtils.isEmpty(traitNames)) {
-            return;
-        }
+            Referenceable entityRef = toReferenceable(entity.getGuid());
+            List<Struct>  traits    = toStruct(updatedClassifications);
 
-        for (EntityChangeListener listener : entityChangeListeners) {
-            try {
-                listener.onTraitsDeleted(entity, traitNames);
-            } catch (AtlasException e) {
-                throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
+            if (entityRef == null || CollectionUtils.isEmpty(traits)) {
+                return;
+            }
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsUpdated(entityRef, traits);
+                } catch (AtlasException e) {
+                    throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
+                }
             }
         }
     }
 
-    public void onClassificationUpdatedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException {
-        // Since the classification attributes are updated in the graph, we need to recursively remap the entityText
-        doFullTextMapping(entityId);
+    public void onClassificationDeletedFromEntity(AtlasEntity entity, List<String> deletedClassificationNames) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
+
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsDeleted(entity, deletedClassificationNames);
+            }
+        } else {
+            doFullTextMapping(entity.getGuid());
 
-        Referenceable entity = toReferenceable(entityId);
-        List<Struct>  traits = toStruct(classifications);
+            Referenceable entityRef = toReferenceable(entity.getGuid());
 
-        if (entity == null || CollectionUtils.isEmpty(traits)) {
-            return;
-        }
+            if (entityRef == null || CollectionUtils.isEmpty(deletedClassificationNames)) {
+                return;
+            }
 
-        for (EntityChangeListener listener : entityChangeListeners) {
-            try {
-                listener.onTraitsUpdated(entity, traits);
-            } catch (AtlasException e) {
-                throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsDeleted(entityRef, deletedClassificationNames);
+                } catch (AtlasException e) {
+                    throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
+                }
             }
+
         }
     }
 
@@ -158,6 +184,14 @@ public class AtlasEntityChangeNotifier {
             return;
         }
 
+        if (isV2EntityNotificationEnabled()) {
+            notifyV2Listeners(entityHeaders, operation, isImport);
+        } else {
+            notifyV1Listeners(entityHeaders, operation, isImport);
+        }
+    }
+
+    private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
         List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
 
         for (EntityChangeListener listener : entityChangeListeners) {
@@ -180,7 +214,26 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
-    private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException {
+    private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
+        List<AtlasEntity> entities = toAtlasEntities(entityHeaders);
+
+        for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+            switch (operation) {
+                case CREATE:
+                    listener.onEntitiesAdded(entities, isImport);
+                    break;
+                case UPDATE:
+                case PARTIAL_UPDATE:
+                    listener.onEntitiesUpdated(entities, isImport);
+                    break;
+                case DELETE:
+                    listener.onEntitiesDeleted(entities, isImport);
+                    break;
+            }
+        }
+    }
+
+        private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException {
         List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
 
         // delete notifications don't need all attributes. Hence the special handling for delete operation
@@ -207,7 +260,7 @@ public class AtlasEntityChangeNotifier {
         return ret;
     }
 
-    private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException {
+        private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException {
         List<Struct> ret = null;
 
         if (classifications != null) {
@@ -223,6 +276,23 @@ public class AtlasEntityChangeNotifier {
         return ret;
     }
 
+    private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException {
+        List<AtlasEntity> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(entityHeaders)) {
+            for (AtlasEntityHeader entityHeader : entityHeaders) {
+                String                 entityGuid        = entityHeader.getGuid();
+                AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
+
+                if (entityWithExtInfo != null) {
+                    ret.add(entityWithExtInfo.getEntity());
+                }
+            }
+        }
+
+        return ret;
+    }
+
     private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) {
         if (CollectionUtils.isEmpty(entityHeaders)) {
             return;
@@ -293,4 +363,4 @@ public class AtlasEntityChangeNotifier {
 
         doFullTextMapping(Collections.singletonList(entityHeader));
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index ca0eeeb..bf417dd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -462,54 +462,26 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         validateEntityAssociations(guid, classifications);
 
         entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
-
-        // notify listeners on classification addition
-        entityChangeNotifier.onClassificationAddedToEntity(guid, classifications);
     }
 
     @Override
     @GraphTransaction
-    public void updateClassifications(String guid, List<AtlasClassification> newClassifications) throws AtlasBaseException {
+    public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Updating classifications={} for entity={}", newClassifications, guid);
+            LOG.debug("Updating classifications={} for entity={}", classifications, guid);
         }
 
         if (StringUtils.isEmpty(guid)) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified");
         }
 
-        if (CollectionUtils.isEmpty(newClassifications)) {
+        if (CollectionUtils.isEmpty(classifications)) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
         }
 
         GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
-        List<AtlasClassification> updatedClassifications = new ArrayList<>();
-
-        for (AtlasClassification newClassification : newClassifications) {
-            String              classificationName = newClassification.getTypeName();
-            AtlasClassification oldClassification  = getClassification(guid, classificationName);
-
-            if (oldClassification == null) {
-                throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
-            }
 
-            validateAndNormalizeForUpdate(newClassification);
-
-            Map<String, Object> newAttrs = newClassification.getAttributes();
-
-            if (MapUtils.isNotEmpty(newAttrs)) {
-                for (String attrName : newAttrs.keySet()) {
-                    oldClassification.setAttribute(attrName, newAttrs.get(attrName));
-                }
-            }
-
-            entityGraphMapper.updateClassification(new EntityMutationContext(), guid, oldClassification);
-
-            updatedClassifications.add(oldClassification);
-        }
-
-        // notify listeners on update to classifications
-        entityChangeNotifier.onClassificationUpdatedToEntity(guid, updatedClassifications);
+        entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications);
     }
 
     @Override
@@ -533,15 +505,10 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         List<AtlasClassification> classifications = Collections.singletonList(classification);
 
         for (String guid : guids) {
-            // validate if entity, not already associated with classifications
             validateEntityAssociations(guid, classifications);
 
             entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
-
-            // notify listeners on classification addition
-            entityChangeNotifier.onClassificationAddedToEntity(guid, classifications);
         }
-
     }
 
     @Override
@@ -561,16 +528,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
 
         entityGraphMapper.deleteClassifications(guid, classificationNames);
-
-        // notify listeners on classification deletion
-        entityChangeNotifier.onClassificationDeletedFromEntity(guid, classificationNames);
     }
 
     @Override
     @GraphTransaction
     public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Getting classifications for entities={}", guid);
+            LOG.debug("Getting classifications for entity={}", guid);
         }
 
         EntityGraphRetriever graphRetriever = new EntityGraphRetriever(typeRegistry);
@@ -680,24 +644,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         type.getNormalizedValue(classification);
     }
 
-    private void validateAndNormalizeForUpdate(AtlasClassification classification) throws AtlasBaseException {
-        AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
-
-        if (type == null) {
-            throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
-        }
-
-        List<String> messages = new ArrayList<>();
-
-        type.validateValueForUpdate(classification, classification.getTypeName(), messages);
-
-        if (!messages.isEmpty()) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
-        }
-
-        type.getNormalizedValueForUpdate(classification);
-    }
-
     /**
      * Validate if classification is not already associated with the entities
      *
@@ -734,7 +680,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             ret = new ArrayList<>();
 
             for (AtlasClassification classification : classifications) {
-                ret.add(classification.getTypeName());
+                String entityGuid = classification.getEntityGuid();
+
+                if (StringUtils.isEmpty(entityGuid) || StringUtils.equalsIgnoreCase(guid, entityGuid)) {
+                    ret.add(classification.getTypeName());
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
index 7389f49..28636d8 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
@@ -30,6 +31,7 @@ import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.type.AtlasEntityType;
@@ -54,10 +56,16 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
 import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
 import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
-import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.BOTH;
+import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
+import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
+import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
 import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getState;
 import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getTypeName;
 
@@ -117,8 +125,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
 
         AtlasEdge   edge       = graphHelper.getEdgeForGUID(guid);
         String      edgeType   = AtlasGraphUtilsV1.getTypeName(edge);
-        AtlasVertex end1Vertex = edge.getInVertex();
-        AtlasVertex end2Vertex = edge.getOutVertex();
+        AtlasVertex end1Vertex = edge.getOutVertex();
+        AtlasVertex end2Vertex = edge.getInVertex();
 
         // update shouldn't change endType
         if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
@@ -302,6 +310,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
     private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
         AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
 
+        updateTagPropagations(relationshipEdge, relationship.getPropagateTags());
+
         AtlasGraphUtilsV1.setProperty(relationshipEdge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, relationship.getPropagateTags().name());
 
         if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
@@ -318,6 +328,46 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
         return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
     }
 
+    private void updateTagPropagations(AtlasEdge relationshipEdge, PropagateTags tagPropagation) throws AtlasBaseException {
+        PropagateTags oldTagPropagation = getPropagateTags(relationshipEdge);
+        PropagateTags newTagPropagation = tagPropagation;
+
+        if (newTagPropagation != oldTagPropagation) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Updating tagPropagation property: [ {} -> {} ] for relationship: [{} --> {}]", oldTagPropagation.name(),
+                          newTagPropagation.name(), getTypeName(relationshipEdge.getOutVertex()), getTypeName(relationshipEdge.getInVertex()));
+            }
+
+            if (oldTagPropagation == NONE) {
+                entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation);
+            } else if (oldTagPropagation == ONE_TO_TWO) {
+                if (newTagPropagation == NONE || newTagPropagation == TWO_TO_ONE) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, oldTagPropagation);
+                }
+
+                if (newTagPropagation != NONE) {
+                    entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation);
+                }
+            } else if (oldTagPropagation == TWO_TO_ONE) {
+                if (newTagPropagation == NONE || newTagPropagation == ONE_TO_TWO) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, oldTagPropagation);
+                }
+
+                if (newTagPropagation != NONE) {
+                    entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation);
+                }
+            } else if (oldTagPropagation == BOTH) {
+                if (newTagPropagation == ONE_TO_TWO || newTagPropagation == NONE) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, TWO_TO_ONE);
+                }
+
+                if (newTagPropagation == TWO_TO_ONE || newTagPropagation == NONE) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, ONE_TO_TWO);
+                }
+            }
+        }
+    }
+
     private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException {
         if (relationship == null) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null");
@@ -462,16 +512,20 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
     }
 
     public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipType) {
-        String    relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
-        AtlasEdge ret               = graphHelper.getEdgeForLabel(fromVertex, relationshipLabel);
+        String              relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
+        Iterator<AtlasEdge> edgesIterator     = getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
+        AtlasEdge           ret               = null;
 
-        if (ret != null) {
-            AtlasVertex inVertex = ret.getInVertex();
+        while (edgesIterator != null && edgesIterator.hasNext()) {
+            AtlasEdge edge = edgesIterator.next();
 
-            if (inVertex != null) {
-                if (!StringUtils.equals(AtlasGraphUtilsV1.getIdFromVertex(inVertex),
-                                        AtlasGraphUtilsV1.getIdFromVertex(toVertex))) {
-                    ret = null;
+            if (edge != null) {
+                Status status = graphHelper.getStatus(edge);
+
+                if ((status == null || status == ACTIVE) &&
+                        StringUtils.equals(getIdFromVertex(edge.getInVertex()), getIdFromVertex(toVertex))) {
+                    ret = edge;
+                    break;
                 }
             }
         }
@@ -499,11 +553,15 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
         return ret;
     }
 
-    private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException {
+    private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException, AtlasBaseException {
         String        relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship.getTypeName());
         PropagateTags tagPropagation    = getRelationshipTagPropagation(fromVertex, toVertex, relationship);
         AtlasEdge     ret               = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel);
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created relationship edge from [{}] --> [{}] using edge label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), relationshipLabel);
+        }
+
         // map additional properties to relationship edge
         if (ret != null) {
             final String guid = UUID.randomUUID().toString();
@@ -512,6 +570,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
             AtlasGraphUtilsV1.setProperty(ret, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, guid);
             AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getRelationshipVersion(relationship));
             AtlasGraphUtilsV1.setProperty(ret, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
+
+            // propagate tags
+            entityRetriever.addTagPropagation(ret, tagPropagation);
         }
 
         return ret;
@@ -596,7 +657,7 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
      */
     private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String relationshipTypeName) {
         String relationshipEdgeLabel = getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName);
-        Iterator<AtlasEdge> iter     = graphHelper.getAdjacentEdgesByLabel(vertex, BOTH, relationshipEdgeLabel);
+        Iterator<AtlasEdge> iter     = graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, relationshipEdgeLabel);
 
         return (iter != null) ? iter.hasNext() : false;
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 779bc38..0224bf0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
@@ -36,6 +37,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -51,6 +53,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdg
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.collections.MapUtils;
@@ -63,14 +66,24 @@ import javax.inject.Inject;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static org.apache.atlas.model.TypeCategory.CLASSIFICATION;
 import static org.apache.atlas.model.instance.AtlasRelatedObjectId.KEY_RELATIONSHIP_ATTRIBUTES;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
+import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
+import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
 import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
 import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
 import static org.apache.atlas.repository.graph.GraphHelper.string;
 import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
@@ -81,21 +94,26 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
 public class EntityGraphMapper {
     private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class);
 
-    private final GraphHelper            graphHelper = GraphHelper.getInstance();
-    private final AtlasGraph             graph;
-    private final DeleteHandlerV1        deleteHandler;
-    private final AtlasTypeRegistry      typeRegistry;
-    private final EntityGraphRetriever   entityRetriever;
-    private final AtlasRelationshipStore relationshipStore;
+    private final GraphHelper               graphHelper = GraphHelper.getInstance();
+    private final AtlasGraph                graph;
+    private final DeleteHandlerV1           deleteHandler;
+    private final AtlasTypeRegistry         typeRegistry;
+    private final AtlasRelationshipStore    relationshipStore;
+    private final AtlasEntityChangeNotifier entityChangeNotifier;
+    private final AtlasInstanceConverter    instanceConverter;
+    private final EntityGraphRetriever      entityRetriever;
 
     @Inject
     public EntityGraphMapper(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
-                             AtlasRelationshipStore relationshipStore) {
-        this.deleteHandler     = deleteHandler;
-        this.typeRegistry      = typeRegistry;
-        this.entityRetriever   = new EntityGraphRetriever(typeRegistry);
-        this.graph             = atlasGraph;
-        this.relationshipStore = relationshipStore;
+                             AtlasRelationshipStore relationshipStore, AtlasEntityChangeNotifier entityChangeNotifier,
+                             AtlasInstanceConverter instanceConverter) {
+        this.deleteHandler        = deleteHandler;
+        this.typeRegistry         = typeRegistry;
+        this.graph                = atlasGraph;
+        this.relationshipStore    = relationshipStore;
+        this.entityChangeNotifier = entityChangeNotifier;
+        this.instanceConverter    = instanceConverter;
+        this.entityRetriever      = new EntityGraphRetriever(typeRegistry);
     }
 
     public AtlasVertex createVertex(AtlasEntity entity) {
@@ -241,6 +259,8 @@ public class EntityGraphMapper {
         AtlasVertex ret = createStructVertex(classification);
 
         AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, classificationType.getAllSuperTypes());
+        AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_ENTITY_GUID, classification.getEntityGuid());
+        AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_PROPAGATE_KEY, classification.isPropagate());
 
         return ret;
     }
@@ -903,7 +923,7 @@ public class EntityGraphMapper {
 
     private void updateModificationMetadata(AtlasVertex vertex) {
         AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
-        GraphHelper.setProperty(vertex, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
+        AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
     }
 
     private Long getEntityVersion(AtlasEntity entity) {
@@ -1285,80 +1305,299 @@ public class EntityGraphMapper {
         }
     }
 
-    public void addClassifications(final EntityMutationContext context, String guid, List<AtlasClassification> classifications)
-        throws AtlasBaseException {
+    public void addClassifications(final EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
 
-        if ( CollectionUtils.isNotEmpty(classifications)) {
-
-            AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
-            if (instanceVertex == null) {
+            if (entityVertex == null) {
                 throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
             }
 
-            String entityTypeName = AtlasGraphUtilsV1.getTypeName(instanceVertex);
-
-            final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
+            String                    entityTypeName            = AtlasGraphUtilsV1.getTypeName(entityVertex);
+            final AtlasEntityType     entityType                = typeRegistry.getEntityTypeByName(entityTypeName);
+            List<AtlasVertex>         propagatedEntityVertices  = null;
+            List<AtlasClassification> propagagedClassifications = null;
 
             for (AtlasClassification classification : classifications) {
+                String  classificationName = classification.getTypeName();
+                boolean propagateTags      = classification.isPropagate();
+
+                // set associated entity id to classification
+                classification.setEntityGuid(guid);
+
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("mapping classification {}", classification);
+                    LOG.debug("Adding classification [{}] to [{}] using edge label: [{}]", classificationName, entityTypeName, getTraitLabel(classificationName));
                 }
 
-                GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, classification.getTypeName());
+                GraphHelper.addProperty(entityVertex, TRAIT_NAMES_PROPERTY_KEY, classificationName);
+
                 // add a new AtlasVertex for the struct or trait instance
                 AtlasVertex classificationVertex = createClassificationVertex(classification);
+
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("created vertex {} for trait {}", string(classificationVertex), classification.getTypeName());
+                    LOG.debug("created vertex {} for trait {}", string(classificationVertex), classificationName);
                 }
 
                 // add the attributes for the trait instance
-                mapClassification(EntityOperation.CREATE, context, classification, entityType, instanceVertex, classificationVertex);
+                mapClassification(EntityOperation.CREATE, context, classification, entityType, entityVertex, classificationVertex);
+
+                if (propagateTags) {
+                    // compute propagatedEntityVertices only once
+                    if (propagatedEntityVertices == null) {
+                        propagatedEntityVertices = graphHelper.getImpactedVertices(guid);
+                    }
+
+                    if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(propagatedEntityVertices));
+                        }
+
+                        if (propagagedClassifications == null) {
+                            propagagedClassifications = new ArrayList<>();
+                        }
+
+                        propagagedClassifications.add(classification);
+
+                        addTagPropagation(classificationVertex, propagatedEntityVertices);
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityTypeName);
+                        }
+                    }
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" --> Not propagating classification: [{}][{}] - propagation is disabled.", getTypeName(classificationVertex), entityTypeName);
+                    }
+                }
+            }
+
+            // notify listeners on classification addition
+            List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
+
+            if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                notificationVertices.addAll(propagatedEntityVertices);
+            }
+
+            for (AtlasVertex vertex : notificationVertices) {
+                String                    entityGuid           = GraphHelper.getGuid(vertex);
+                AtlasEntityWithExtInfo    entityWithExtInfo    = instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity               = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+                List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? classifications : Collections.emptyList();
+
+                entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications);
             }
         }
     }
 
-    public void updateClassification(final EntityMutationContext context, String guid, AtlasClassification classification)
-                                     throws AtlasBaseException {
+    public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classificationNames)) {
+            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
 
-        AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
+            if (entityVertex == null) {
+                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+            }
 
-        if (instanceVertex == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
-        }
+            List<String> traitNames = getTraitNames(entityVertex);
 
-        String entityTypeName = AtlasGraphUtilsV1.getTypeName(instanceVertex);
+            validateClassificationExists(traitNames, classificationNames);
 
-        final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
+            Set<String> impactedEntities = new HashSet<String>() {{ add(guid); }};
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Updating classification {} for entity {}", classification, guid);
+            for (String classificationName : classificationNames) {
+                AtlasEdge   classificationEdge   = graphHelper.getEdgeForLabel(entityVertex, getTraitLabel(classificationName));
+                AtlasVertex classificationVertex = classificationEdge.getInVertex();
+
+                // remove classification from propagated entity vertices
+                boolean propagationEnabled = entityRetriever.isPropagationEnabled(classificationVertex);
+
+                if (propagationEnabled) {
+                    List<AtlasVertex> impactedEntityVertices = graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex);
+
+                    if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
+                        removeTagPropagation(classificationVertex);
+
+                        for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
+                            impactedEntities.add(GraphHelper.getGuid(impactedEntityVertex));
+                        }
+                    }
+                }
+
+                // remove classification from associated entity vertex
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, getTypeName(entityVertex), guid, getTraitLabel(classificationName));
+                }
+
+                deleteHandler.deleteEdgeReference(classificationEdge, CLASSIFICATION, false, true, entityVertex);
+
+                traitNames.remove(classificationName);
+            }
+
+            updateTraitNamesProperty(entityVertex, traitNames);
+
+            updateModificationMetadata(entityVertex);
+
+            for (String entityGuid : impactedEntities) {
+                AtlasEntityWithExtInfo    entityWithExtInfo          = instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity                     = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+                List<String>              deletedClassificationNames = StringUtils.equals(entityGuid, guid) ? classificationNames : Collections.emptyList();
+
+                entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
+            }
         }
+    }
+
+    public void updateClassifications(EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
+
+            if (entityVertex == null) {
+                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+            }
+
+            String                    entityTypeName           = AtlasGraphUtilsV1.getTypeName(entityVertex);
+            AtlasEntityType           entityType               = typeRegistry.getEntityTypeByName(entityTypeName);
+            List<AtlasClassification> updatedClassifications   = new ArrayList<>();
+            List<AtlasVertex>         propagatedEntityVertices = new ArrayList<>();
+
+            for (AtlasClassification classification : classifications) {
+                String classificationName       = classification.getTypeName();
+                String classificationEntityGuid = classification.getEntityGuid();
+
+                if (StringUtils.isNotEmpty(classificationEntityGuid) && StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
+                    throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
+                }
+
+                String    relationshipLabel  = getTraitLabel(entityTypeName, classificationName);
+                AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(entityVertex, relationshipLabel);
+
+                if (classificationEdge == null) {
+                    throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Updating classification {} for entity {}", classification, guid);
+                }
+
+                AtlasVertex         classificationVertex  = classificationEdge.getInVertex();
+                AtlasClassification currentClassification = entityRetriever.toAtlasClassification(classificationVertex);
+
+                validateAndNormalizeForUpdate(classification);
+
+                Map<String, Object> classificationAttributes = classification.getAttributes();
+
+                if (MapUtils.isNotEmpty(classificationAttributes)) {
+                    for (String attributeName : classificationAttributes.keySet()) {
+                        currentClassification.setAttribute(attributeName, classificationAttributes.get(attributeName));
+                    }
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classificationName);
+                }
+
+                mapClassification(EntityOperation.UPDATE, context, classification, entityType, entityVertex, classificationVertex);
+
+                // handle update of 'propagate' flag
+                boolean currentTagPropagation = currentClassification.isPropagate();
+                boolean updatedTagPropagation = classification.isPropagate();
+
+                // compute propagatedEntityVertices once and use it for subsequent iterations and notifications
+                if (CollectionUtils.isEmpty(propagatedEntityVertices)) {
+                    propagatedEntityVertices = (currentTagPropagation) ? graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex) :
+                            graphHelper.getImpactedVertices(guid);
+                }
+
+                if (currentTagPropagation != updatedTagPropagation) {
+                    if (updatedTagPropagation) {
+                        addTagPropagation(classificationVertex, propagatedEntityVertices);
+                    } else {
+                        removeTagPropagation(classificationVertex);
+                    }
 
-        // get the classification vertex from entity
-        String      relationshipLabel    = GraphHelper.getTraitLabel(entityTypeName, classification.getTypeName());
-        AtlasEdge   classificationEdge   = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
+                    AtlasGraphUtilsV1.setProperty(classificationVertex, Constants.CLASSIFICATION_PROPAGATE_KEY, updatedTagPropagation);
+                }
+
+                updatedClassifications.add(currentClassification);
+            }
 
-        if (classificationEdge == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "classificationEdge is null for label: " + relationshipLabel);
+            // notify listeners on classification update
+            List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
+
+            if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                notificationVertices.addAll(propagatedEntityVertices);
+            }
+
+            for (AtlasVertex vertex : notificationVertices) {
+                String                    entityGuid                = GraphHelper.getGuid(vertex);
+                AtlasEntityWithExtInfo    entityWithExtInfo         = instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity                    = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+                List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList();
+
+                entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList);
+            }
         }
+    }
 
-        AtlasVertex classificationVertex = classificationEdge.getInVertex();
+    private void addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
+        if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) {
+            String                  classificationName = getTypeName(classificationVertex);
+            AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classification.getTypeName());
+            for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) {
+                String          entityTypeName = getTypeName(propagatedEntityVertex);
+                AtlasEntityType entityType     = typeRegistry.getEntityTypeByName(entityTypeName);
+
+                if (classificationType.canApplyToEntityType(entityType)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex),
+                                GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName));
+                    }
+
+                    graphHelper.addEdge(propagatedEntityVertex, classificationVertex, getPropagatedEdgeLabel(classificationName));
+
+                    addListProperty(propagatedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
+                }
+            }
         }
+    }
 
-        mapClassification(EntityOperation.UPDATE, context, classification, entityType, instanceVertex, classificationVertex);
+    private void removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
+        if (classificationVertex != null) {
+            String              classificationName = getTypeName(classificationVertex);
+            Iterator<AtlasEdge> iterator           = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName));
+
+            // remove classification from propagated entity vertices
+            while (iterator != null && iterator.hasNext()) {
+                AtlasEdge propagatedEdge = iterator.next();
+
+                if (propagatedEdge != null) {
+                    AtlasVertex propagatedEntityVertex = propagatedEdge.getOutVertex();
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
+                                  getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName));
+                    }
+
+                    removePropagatedTraitName(propagatedEntityVertex, classificationName);
+
+                    deleteHandler.deleteEdge(propagatedEdge, true);
+
+                    updateModificationMetadata(propagatedEntityVertex);
+                }
+            }
+        }
     }
 
-    private AtlasEdge mapClassification(EntityOperation operation,  final EntityMutationContext context, AtlasClassification classification, AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
-        throws AtlasBaseException {
+    private AtlasEdge mapClassification(EntityOperation operation,  final EntityMutationContext context, AtlasClassification classification,
+                                        AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
+                                        throws AtlasBaseException {
 
         // map all the attributes to this newly created AtlasVertex
         mapAttributes(classification, traitInstanceVertex, operation, context);
 
         // add an edge to the newly created AtlasVertex from the parent
-        String relationshipLabel = GraphHelper.getTraitLabel(entityType.getTypeName(), classification.getTypeName());
+        String relationshipLabel = getTraitLabel(entityType.getTypeName(), classification.getTypeName());
+
         try {
            return graphHelper.getOrCreateEdge(parentInstanceVertex, traitInstanceVertex, relationshipLabel);
         } catch (RepositoryException e) {
@@ -1373,54 +1612,40 @@ public class EntityGraphMapper {
             throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
         }
 
-        List<String> traitNames = GraphHelper.getTraitNames(instanceVertex);
+        List<String> traitNames = getTraitNames(instanceVertex);
 
         deleteClassifications(guid, traitNames);
     }
 
-    public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException {
-
-        AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
-        if (instanceVertex == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
-        }
+    private void removePropagatedTraitName(AtlasVertex entityVertex, String classificationName) {
+        if (entityVertex != null && StringUtils.isNotEmpty(classificationName)) {
+            List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex);
 
-        List<String> traitNames = GraphHelper.getTraitNames(instanceVertex);
+            propagatedTraitNames.remove(classificationName);
 
-        validateClassificationExists(traitNames, classificationNames);
+            entityVertex.removeProperty(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY);
 
-        for (String classificationName : classificationNames) {
-            try {
-                final String entityTypeName = getTypeName(instanceVertex);
-                String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, classificationName);
-                AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
-                if (edge != null) {
-                    deleteHandler.deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, instanceVertex);
-
-                    // update the traits in entity once trait removal is successful
-                    traitNames.remove(classificationName);
-
-                }
-            } catch (Exception e) {
-                throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+            for (String propagatedTraitName : propagatedTraitNames) {
+                addListProperty(entityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, propagatedTraitName);
             }
         }
+    }
 
-        // remove the key
-        instanceVertex.removeProperty(Constants.TRAIT_NAMES_PROPERTY_KEY);
+    private void updateTraitNamesProperty(AtlasVertex entityVertex, List<String> traitNames) {
+        if (entityVertex != null) {
+            entityVertex.removeProperty(TRAIT_NAMES_PROPERTY_KEY);
 
-        // add it back again
-        for (String traitName : traitNames) {
-            GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, traitName);
+            for (String traitName : traitNames) {
+                GraphHelper.addProperty(entityVertex, TRAIT_NAMES_PROPERTY_KEY, traitName);
+            }
         }
-        updateModificationMetadata(instanceVertex);
     }
 
     private void validateClassificationExists(List<String> existingClassifications, List<String> suppliedClassifications) throws AtlasBaseException {
         Set<String> existingNames = new HashSet<>(existingClassifications);
         for (String classificationName : suppliedClassifications) {
             if (!existingNames.contains(classificationName)) {
-                throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
+                throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
             }
         }
     }
@@ -1489,4 +1714,22 @@ public class EntityGraphMapper {
 
         return currentEntityId;
     }
+
+    public void validateAndNormalizeForUpdate(AtlasClassification classification) throws AtlasBaseException {
+        AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+
+        if (type == null) {
+            throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
+        }
+
+        List<String> messages = new ArrayList<>();
+
+        type.validateValueForUpdate(classification, classification.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
+        }
+
+        type.getNormalizedValueForUpdate(classification);
+    }
 }