You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/11/14 23:30:42 UTC

[atlas] branch branch-2.0 updated: ATLAS-3477: Add entity purge API in Admin Resource

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7677035  ATLAS-3477: Add entity purge API in Admin Resource
7677035 is described below

commit 7677035451d61d9ba4a0daef56af85000c8b115c
Author: Sidharth <si...@gmail.com>
AuthorDate: Tue Nov 12 12:39:08 2019 -0800

    ATLAS-3477: Add entity purge API in Admin Resource
    
    (cherry picked from commit 09118ca750208afc8f01638531555aada33e9648)
---
 .../org/apache/atlas/authorize/AtlasPrivilege.java |  3 +-
 .../main/java/org/apache/atlas/AtlasClientV2.java  | 11 ++++
 .../atlas/listener/EntityChangeListenerV2.java     | 15 +++++
 .../atlas/model/audit/EntityAuditEventV2.java      |  4 +-
 .../model/instance/EntityMutationResponse.java     |  8 +++
 .../atlas/model/instance/EntityMutations.java      |  3 +-
 .../repository/audit/EntityAuditListenerV2.java    | 28 ++++++++
 .../repository/store/graph/AtlasEntityStore.java   |  6 ++
 .../repository/store/graph/v1/DeleteHandlerV1.java | 43 +++++++++----
 .../store/graph/v2/AtlasEntityChangeNotifier.java  | 24 ++++++-
 .../store/graph/v2/AtlasEntityStoreV2.java         | 56 +++++++++++++++-
 .../store/graph/v2/AtlasGraphUtilsV2.java          | 20 ++++++
 .../main/java/org/apache/atlas/RequestContext.java | 22 ++++++-
 .../notification/EntityNotificationListenerV2.java | 10 +++
 .../apache/atlas/web/resources/AdminResource.java  | 26 ++++++++
 .../web/integration/EntityV2JerseyResourceIT.java  | 74 ++++++++++++++++++----
 16 files changed, 319 insertions(+), 34 deletions(-)

diff --git a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
index 59c596d..38b68fa 100644
--- a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
+++ b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
@@ -36,7 +36,8 @@ public enum AtlasPrivilege {
 
      RELATIONSHIP_ADD("add-relationship"),
      RELATIONSHIP_UPDATE("update-relationship"),
-     RELATIONSHIP_REMOVE("remove-relationship");
+     RELATIONSHIP_REMOVE("remove-relationship"),
+     ADMIN_PURGE("admin-purge");
 
      private final String type;
 
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 199d6bf..8c0a640 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -49,6 +49,7 @@ import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class AtlasClientV2 extends AtlasBaseClient {
     // Type APIs
@@ -63,6 +64,11 @@ public class AtlasClientV2 extends AtlasBaseClient {
     private static final String GET_BY_NAME_TEMPLATE = TYPES_API + "%s/name/%s";
     private static final String GET_BY_GUID_TEMPLATE = TYPES_API + "%s/guid/%s";
     private static final String ENTITY_BULK_API      = ENTITY_API + "bulk/";
+
+    //Admin Entity Purge
+    private static final String ADMIN_API            = BASE_URI + "admin/";
+    private static final String ENTITY_PURGE_API     = ADMIN_API + "purge/";
+
     // Lineage APIs
     private static final String LINEAGE_URI  = BASE_URI + "v2/lineage/";
 
@@ -362,6 +368,10 @@ public class AtlasClientV2 extends AtlasBaseClient {
         return callAPI(API_V2.DELETE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, "guid", guids);
     }
 
+    public EntityMutationResponse purgeEntitiesByGuids(Set<String> guids) throws AtlasServiceException {
+        return callAPI(API_V2.PURGE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, guids);
+    }
+
     public AtlasClassifications getClassifications(String guid) throws AtlasServiceException {
         return callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, null);
     }
@@ -555,6 +565,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
         public static final API_V2 CREATE_ENTITIES             = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
         public static final API_V2 UPDATE_ENTITIES             = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
         public static final API_V2 DELETE_ENTITIES_BY_GUIDS    = new API_V2(ENTITY_BULK_API, HttpMethod.DELETE, Response.Status.OK);
+        public static final API_V2 PURGE_ENTITIES_BY_GUIDS     = new API_V2(ENTITY_PURGE_API, HttpMethod.DELETE, Response.Status.OK);
         public static final API_V2 GET_CLASSIFICATIONS         = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.GET, Response.Status.OK);
         public static final API_V2 ADD_CLASSIFICATIONS         = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.POST, Response.Status.NO_CONTENT);
         public static final API_V2 UPDATE_CLASSIFICATIONS      = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.PUT, Response.Status.NO_CONTENT);
diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
index 444167e..d36582b 100644
--- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -56,6 +56,14 @@ public interface EntityChangeListenerV2 {
      */
     void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException;
 
+
+    /**
+     * This is upon purging entities from the repository.
+     *
+     * @param entities the purged entities
+     */
+    void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException;
+
     /**
      * This is upon adding new classifications to an entity.
      *
@@ -124,6 +132,13 @@ public interface EntityChangeListenerV2 {
     void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException;
 
     /**
+     * This is upon purging relationships from the repository.
+     *
+     * @param relationships the purged relationships
+     */
+    void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException;
+
+    /**
      * This is upon add new labels to an entity.
      *
      * @param entity the entity
diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index e9cc7cd..9301e21 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -50,7 +50,7 @@ public class EntityAuditEventV2 implements Serializable {
         ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
         CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
         PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
-        TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE;
+        TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE;
 
         public static EntityAuditActionV2 fromString(String strValue) {
             switch (strValue) {
@@ -60,6 +60,8 @@ public class EntityAuditEventV2 implements Serializable {
                     return ENTITY_UPDATE;
                 case "ENTITY_DELETE":
                     return ENTITY_DELETE;
+                case "ENTITY_PURGE":
+                    return ENTITY_PURGE;
                 case "ENTITY_IMPORT_CREATE":
                     return ENTITY_IMPORT_CREATE;
                 case "ENTITY_IMPORT_UPDATE":
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
index 7ace00d..b448d51 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
@@ -115,6 +115,14 @@ public class EntityMutationResponse {
     }
 
     @JsonIgnore
+    public List<AtlasEntityHeader> getPurgedEntities() {
+        if ( mutatedEntities != null) {
+            return mutatedEntities.get(EntityOperation.PURGE);
+        }
+        return null;
+    }
+
+    @JsonIgnore
     public AtlasEntityHeader getFirstEntityCreated() {
         final List<AtlasEntityHeader> entitiesByOperation = getEntitiesByOperation(EntityOperation.CREATE);
         if ( entitiesByOperation != null && entitiesByOperation.size() > 0) {
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
index daf4ca8..1dd5aa0 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
@@ -48,7 +48,8 @@ public class EntityMutations implements Serializable {
         CREATE,
         UPDATE,
         PARTIAL_UPDATE,
-        DELETE
+        DELETE,
+        PURGE
     }
 
     public static final class EntityMutation implements Serializable {
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 43a9b84..706da16 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -54,6 +54,7 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_PURGE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE;
 import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE;
@@ -133,6 +134,23 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
     }
 
     @Override
+    public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
+        List<EntityAuditEventV2> events = new ArrayList<>();
+
+        for (AtlasEntity entity : entities) {
+            EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE, "Purged entity");
+
+            events.add(event);
+        }
+
+        auditRepository.putEventsV2(events);
+
+        RequestContext.get().endMetricRecord(metric);
+    }
+
+    @Override
     public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications)) {
             MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
@@ -470,6 +488,9 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
             case ENTITY_DELETE:
                 ret = "Deleted: ";
                 break;
+            case ENTITY_PURGE:
+                ret = "Purged: ";
+                break;
             case CLASSIFICATION_ADD:
                 ret = "Added classification: ";
                 break;
@@ -521,4 +542,11 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
             LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
         }
     }
+
+    @Override
+    public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
+        }
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index b94590b..49dd5c5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -203,11 +203,17 @@ public interface AtlasEntityStore {
      */
 
     String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException;
+
     /*
      * Return list of deleted entity guids
      */
     EntityMutationResponse deleteByIds(List<String> guid) throws AtlasBaseException;
 
+    /*
+     * Return list of purged entity guids
+     */
+    EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException;
+
     /**
      * Add classification(s)
      */
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index d2544df..6214584 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -106,9 +106,16 @@ public abstract class DeleteHandlerV1 {
             String              guid = AtlasGraphUtilsV2.getIdFromVertex(instanceVertex);
             AtlasEntity.Status state = getState(instanceVertex);
 
-            if (state == DELETED || requestContext.isDeletedEntity(guid)) {
+            boolean needToSkip = requestContext.isPurgeRequested() ? (state == ACTIVE || requestContext.isPurgedEntity(guid)) :
+                    (state == DELETED || requestContext.isDeletedEntity(guid));
+
+            if (needToSkip) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Skipping deletion of {} as it is already deleted", guid);
+                    if(RequestContext.get().isPurgeRequested()) {
+                        LOG.debug("Skipping purging of {} as it is active or already purged", guid);
+                    } else {
+                        LOG.debug("Skipping deletion of {} as it is already deleted", guid);
+                    }
                 }
 
                 continue;
@@ -149,10 +156,15 @@ public abstract class DeleteHandlerV1 {
     public void deleteRelationships(Collection<AtlasEdge> edges, final boolean forceDelete) throws AtlasBaseException {
         for (AtlasEdge edge : edges) {
             boolean isInternal = isInternalType(edge.getInVertex()) && isInternalType(edge.getOutVertex());
+            boolean needToSkip = !isInternal && (RequestContext.get().isPurgeRequested() ? getState(edge) == ACTIVE : getState(edge) == DELETED);
 
-            if (!isInternal && getState(edge) == DELETED) {
+            if (needToSkip) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
+                    if(RequestContext.get().isPurgeRequested()) {
+                        LOG.debug("Skipping purging of {} as it is active or already purged", getIdFromEdge(edge));
+                    } else{
+                        LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
+                    }
                 }
 
                 continue;
@@ -183,8 +195,10 @@ public abstract class DeleteHandlerV1 {
             AtlasVertex        vertex = vertices.pop();
             AtlasEntity.Status state  = getState(vertex);
 
-            if (state == DELETED) {
-                //If the reference vertex is marked for deletion, skip it
+            //In case of purge If the reference vertex is active then skip it or else
+            //If the vertex marked for deletion, skip it
+            boolean needToSkip = RequestContext.get().isPurgeRequested() ? (state == ACTIVE) : (state == DELETED);
+            if (needToSkip) {
                 continue;
             }
 
@@ -221,7 +235,9 @@ public abstract class DeleteHandlerV1 {
                     } else {
                         AtlasEdge edge = graphHelper.getEdgeForLabel(vertex, edgeLabel);
 
-                        if (edge == null || getState(edge) == DELETED) {
+                        needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
+                                getState(edge) == ACTIVE : getState(edge) == DELETED);
+                        if (needToSkip) {
                             continue;
                         }
 
@@ -274,7 +290,9 @@ public abstract class DeleteHandlerV1 {
 
                         if (CollectionUtils.isNotEmpty(edges)) {
                             for (AtlasEdge edge : edges) {
-                                if (edge == null || getState(edge) == DELETED) {
+                                needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
+                                        getState(edge) == ACTIVE : getState(edge) == DELETED);
+                                if (needToSkip) {
                                     continue;
                                 }
 
@@ -838,8 +856,10 @@ public abstract class DeleteHandlerV1 {
         final String outId    = GraphHelper.getGuid(outVertex);
         final Status state    = getState(outVertex);
 
-        if (state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId))) {
-            //If the reference vertex is marked for deletion, skip updating the reference
+        boolean needToSkip = RequestContext.get().isPurgeRequested() ? state == ACTIVE || (outId != null && RequestContext.get().isPurgedEntity(outId)) :
+                state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId));
+
+        if (needToSkip) {
             return;
         }
 
@@ -954,7 +974,8 @@ public abstract class DeleteHandlerV1 {
         for (AtlasEdge edge : incomingEdges) {
             Status edgeState = getState(edge);
 
-            if (edgeState == ACTIVE) {
+            boolean isProceed = RequestContext.get().isPurgeRequested()? edgeState == DELETED : edgeState == ACTIVE;
+            if (isProceed) {
                 if (isRelationshipEdge(edge)) {
                     deleteRelationship(edge);
                 } else {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index 3389d24..bd1ba58 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -102,6 +102,7 @@ public class AtlasEntityChangeNotifier {
         List<AtlasEntityHeader> updatedEntities          = entityMutationResponse.getUpdatedEntities();
         List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
         List<AtlasEntityHeader> deletedEntities          = entityMutationResponse.getDeletedEntities();
+        List<AtlasEntityHeader> purgedEntities           = entityMutationResponse.getPurgedEntities();
 
         // complete full text mapping before calling toReferenceables(), from notifyListners(), to
         // include all vertex updates in the current graph-transaction
@@ -113,6 +114,7 @@ public class AtlasEntityChangeNotifier {
         notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport);
         notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport);
         notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
+        notifyListeners(purgedEntities, EntityOperation.PURGE, isImport);
 
         notifyPropagatedEntities();
     }
@@ -340,7 +342,7 @@ public class AtlasEntityChangeNotifier {
 
 
     private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
-        if (instanceConverter != null) {
+        if (operation != EntityOperation.PURGE && instanceConverter != null) {
             List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
 
             for (EntityChangeListener listener : entityChangeListeners) {
@@ -381,6 +383,10 @@ public class AtlasEntityChangeNotifier {
                 case DELETE:
                     listener.onEntitiesDeleted(entities, isImport);
                     break;
+
+                case PURGE:
+                    listener.onEntitiesPurged(entities);
+                    break;
             }
         }
     }
@@ -399,6 +405,9 @@ public class AtlasEntityChangeNotifier {
                 case DELETE:
                     listener.onRelationshipsDeleted(relationships, isImport);
                     break;
+                case PURGE:
+                    listener.onRelationshipsPurged(relationships);
+                    break;
             }
         }
     }
@@ -485,7 +494,7 @@ public class AtlasEntityChangeNotifier {
                 final AtlasEntity entity;
 
                 // delete notifications don't need all attributes. Hence the special handling for delete operation
-                if (operation == EntityOperation.DELETE) {
+                if (operation == EntityOperation.DELETE || operation == EntityOperation.PURGE) {
                     entity = new AtlasEntity(entityHeader);
                 } else {
                     String entityGuid = entityHeader.getGuid();
@@ -586,12 +595,23 @@ public class AtlasEntityChangeNotifier {
         List<AtlasEntityHeader> updatedEntities        = resp.getUpdatedEntities();
         List<AtlasEntityHeader> partialUpdatedEntities = resp.getPartialUpdatedEntities();
         List<AtlasEntityHeader> deletedEntities        = resp.getDeletedEntities();
+        List<AtlasEntityHeader> purgedEntities        = resp.getPurgedEntities();
 
         // remove entities with DELETED status from created & updated lists
         purgeDeletedEntities(createdEntities);
         purgeDeletedEntities(updatedEntities);
         purgeDeletedEntities(partialUpdatedEntities);
 
+        // remove entities purged in this mutation from created & updated lists
+        if (purgedEntities != null) {
+            for (AtlasEntityHeader entity : purgedEntities) {
+                purgeEntity(entity.getGuid(), deletedEntities);
+                purgeEntity(entity.getGuid(), createdEntities);
+                purgeEntity(entity.getGuid(), updatedEntities);
+                purgeEntity(entity.getGuid(), partialUpdatedEntities);
+            }
+        }
+
         // remove entities deleted in this mutation from created & updated lists
         if (deletedEntities != null) {
             for (AtlasEntityHeader entity : deletedEntities) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 17a9649..c8e65ef 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasEntityAccessRequest;
 import org.apache.atlas.authorize.AtlasPrivilege;
@@ -43,6 +44,7 @@ import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.store.DeleteType;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -69,8 +71,7 @@ import java.util.Objects;
 import java.util.Set;
 
 import static java.lang.Boolean.FALSE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
 import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
 import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
 import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
@@ -475,6 +476,42 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
 
     @Override
     @GraphTransaction
+    public EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(guids)) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
+        }
+
+        AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "purge entity: guids=", guids);
+        Collection<AtlasVertex> purgeCandidates = new ArrayList<>();
+
+        for (String guid : guids) {
+            AtlasVertex vertex = AtlasGraphUtilsV2.findDeletedByGuid(guid);
+
+            if (vertex == null) {
+                // Entity does not exist - treat as non-error, since the caller
+                // wanted to delete the entity and it's already gone.
+                LOG.warn("Purge request ignored for non-existent/active entity with guid " + guid);
+
+                continue;
+            }
+
+            purgeCandidates.add(vertex);
+        }
+
+        if (purgeCandidates.isEmpty()) {
+            LOG.info("No purge candidate entities were found for guids: " + guids + " which is already deleted");
+        }
+
+        EntityMutationResponse ret = purgeVertices(purgeCandidates);
+
+        // Notify the change listeners
+        entityChangeNotifier.onEntitiesMutated(ret, false);
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
     public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
         if (MapUtils.isEmpty(uniqAttributes)) {
             throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString());
@@ -1124,6 +1161,21 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
         return response;
     }
 
+    private EntityMutationResponse purgeVertices(Collection<AtlasVertex> purgeCandidates) throws AtlasBaseException {
+        EntityMutationResponse response = new EntityMutationResponse();
+        RequestContext         req      = RequestContext.get();
+
+        req.setDeleteType(DeleteType.HARD);
+        req.setPurgeRequested(true);
+        deleteDelegate.getHandler().deleteEntities(purgeCandidates); // this will update req with list of purged entities
+
+        for (AtlasEntityHeader entity : req.getDeletedEntities()) {
+            response.addEntity(PURGE, entity);
+        }
+
+        return response;
+    }
+
     private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException {
         AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 4d57d8b..bf13338 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -353,6 +353,26 @@ public class AtlasGraphUtilsV2 {
         return ret;
     }
 
+    public static AtlasVertex findDeletedByGuid(String guid) {
+        AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
+
+        if (ret == null) {
+            AtlasGraphQuery query = getGraphInstance().query()
+                    .has(Constants.GUID_PROPERTY_KEY, guid)
+                    .has(STATE_PROPERTY_KEY, Status.DELETED.name());
+
+            Iterator<AtlasVertex> results = query.vertices().iterator();
+
+            ret = results.hasNext() ? results.next() : null;
+
+            if (ret != null) {
+                GraphTransactionInterceptor.addToVertexCache(guid, ret);
+            }
+        }
+
+        return ret;
+    }
+
     public static String getTypeNameFromGuid(String guid) {
         String ret = null;
 
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 3d2a18f..f9ca7a2 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -51,6 +51,7 @@ public class RequestContext {
     private final long                                   requestTime          = System.currentTimeMillis();
     private final Map<String, AtlasEntityHeader>         updatedEntities      = new HashMap<>();
     private final Map<String, AtlasEntityHeader>         deletedEntities      = new HashMap<>();
+    private final Map<String, AtlasEntityHeader>         purgedEntities      = new HashMap<>();
     private final Map<String, AtlasEntity>               entityCache          = new HashMap<>();
     private final Map<String, AtlasEntityWithExtInfo>    entityExtInfoCache   = new HashMap<>();
     private final Map<String, List<AtlasClassification>> addedPropagations    = new HashMap<>();
@@ -64,6 +65,7 @@ public class RequestContext {
     private String       clientIPAddress;
     private List<String> forwardedAddresses;
     private DeleteType   deleteType   = DeleteType.DEFAULT;
+    private boolean     isPurgeRequested = false;
     private int         maxAttempts  = 1;
     private int         attemptCount = 1;
     private boolean     isImportInProgress = false;
@@ -108,6 +110,7 @@ public class RequestContext {
     public void clearCache() {
         this.updatedEntities.clear();
         this.deletedEntities.clear();
+        this.purgedEntities.clear();
         this.entityCache.clear();
         this.entityExtInfoCache.clear();
         this.addedPropagations.clear();
@@ -179,6 +182,10 @@ public class RequestContext {
         isImportInProgress = importInProgress;
     }
 
+    public boolean isPurgeRequested() { return isPurgeRequested; }
+
+    public void setPurgeRequested(boolean isPurgeRequested) { this.isPurgeRequested = isPurgeRequested; }
+
     public boolean isInNotificationProcessing() {
         return isInNotificationProcessing;
     }
@@ -215,13 +222,18 @@ public class RequestContext {
         }
     }
 
-
     public void recordEntityDelete(AtlasEntityHeader entity) {
         if (entity != null && entity.getGuid() != null) {
             deletedEntities.put(entity.getGuid(), entity);
         }
     }
 
+    public void recordEntityPurge(AtlasEntityHeader entity) {
+        if (entity != null && entity.getGuid() != null) {
+            purgedEntities.put(entity.getGuid(), entity);
+        }
+    }
+
     public void recordAddedPropagation(String guid, AtlasClassification classification) {
         if (StringUtils.isNotEmpty(guid) && classification != null) {
             List<AtlasClassification> classifications = addedPropagations.get(guid);
@@ -302,6 +314,10 @@ public class RequestContext {
         return deletedEntities.values();
     }
 
+    public Collection<AtlasEntityHeader> getPurgedEntities() {
+        return purgedEntities.values();
+    }
+
     /**
      * Checks if an instance with the given guid is in the cache for this request.  Either returns the instance
      * or null if it is not in the cache.
@@ -329,7 +345,9 @@ public class RequestContext {
         return deletedEntities.containsKey(guid);
     }
 
-
+    public boolean isPurgedEntity(String guid) {
+        return purgedEntities.containsKey(guid);
+    }
 
     public MetricRecorder startMetricRecord(String name) { return metrics != null ? metrics.getMetricRecorder(name) : null; }
 
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index 48f0cd3..8893380 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -97,6 +97,11 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
     }
 
     @Override
+    public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
+        // do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
+    }
+
+    @Override
     public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
         notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_ADD);
     }
@@ -296,4 +301,9 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
     public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
         notifyRelationshipEvents(relationships, RELATIONSHIP_DELETE);
     }
+
+    @Override
+    public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
+        // do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
+    }
 }
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 464d46f..40fd6ef 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -37,6 +37,7 @@ import org.apache.atlas.model.impexp.ExportImportAuditEntry;
 import org.apache.atlas.model.impexp.MigrationStatus;
 import org.apache.atlas.model.instance.AtlasCheckStateRequest;
 import org.apache.atlas.model.instance.AtlasCheckStateResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.metrics.AtlasMetrics;
 import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
 import org.apache.atlas.repository.impexp.AtlasServerService;
@@ -56,6 +57,7 @@ import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.Servlets;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
@@ -431,6 +433,30 @@ public class AdminResource {
         return result;
     }
 
+    @DELETE
+    @Path("/purge")
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(guids)) {
+            for (String guid : guids) {
+                Servlets.validateQueryParamLength("guid", guid);
+            }
+        }
+
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.purgeByGuids(" + guids  + ")");
+            }
+
+            return entityStore.purgeByIds(guids);
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
     @POST
     @Path("/importfile")
     @Produces(Servlets.JSON_MEDIA_TYPE)
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
index 5d506bb..a4b7b0d 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
@@ -21,6 +21,7 @@ package org.apache.atlas.web.integration;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.Lists;
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.EntityAuditEvent;
@@ -49,6 +50,8 @@ import org.testng.annotations.Test;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.testng.Assert.*;
 
@@ -60,6 +63,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
 
     private static final Logger LOG = LoggerFactory.getLogger(EntityV2JerseyResourceIT.class);
 
+    private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = "atlas.notification.entity.version";
+
     private final String DATABASE_NAME = "db" + randomString();
     private final String TABLE_NAME = "table" + randomString();
     private String traitName;
@@ -755,20 +760,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
     @Test
     public void testDeleteEntities() throws Exception {
         // Create 2 database entities
-        AtlasEntity db1 = new AtlasEntity(DATABASE_TYPE_V2);
-        String dbName1 = randomString();
-        db1.setAttribute("name", dbName1);
-        db1.setAttribute(NAME, dbName1);
-        db1.setAttribute("clusterName", randomString());
-        db1.setAttribute("description", randomString());
-        AtlasEntityHeader entity1Header = createEntity(db1);
-        AtlasEntity db2 = new AtlasEntity(DATABASE_TYPE_V2);
-        String dbName2 = randomString();
-        db2.setAttribute("name", dbName2);
-        db2.setAttribute(NAME, dbName2);
-        db2.setAttribute("clusterName", randomString());
-        db2.setAttribute("description", randomString());
-        AtlasEntityHeader entity2Header = createEntity(db2);
+        AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
+        AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
 
         // Delete the database entities
         EntityMutationResponse deleteResponse = atlasClientV2.deleteEntitiesByGuids(Arrays.asList(entity1Header.getGuid(), entity2Header.getGuid()));
@@ -782,6 +775,49 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
     }
 
     @Test
+    public void testPurgeEntities() throws Exception {
+        // Create 2 database entities
+        AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
+        AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
+
+        ApplicationProperties.get().setProperty(ENTITY_NOTIFICATION_VERSION_PROPERTY, "v2");
+
+        // Delete the database entities
+        EntityMutationResponse deleteResponse = atlasClientV2.deleteEntitiesByGuids(Arrays.asList(entity1Header.getGuid(), entity2Header.getGuid()));
+
+        // Verify that deleteEntities() response has database entity guids
+        assertNotNull(deleteResponse);
+        assertNotNull(deleteResponse.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE));
+        assertEquals(deleteResponse.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2);
+
+        Thread.sleep(1000);
+
+        // Purge the database entities
+        Set<String> guids = Stream.of(entity1Header.getGuid(), entity2Header.getGuid()).collect(Collectors.toSet());
+        EntityMutationResponse purgeResponse = atlasClientV2.purgeEntitiesByGuids(guids);
+
+        // Verify that purgeEntities() response has database entity guids
+        assertNotNull(purgeResponse);
+        assertNotNull(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE));
+        assertEquals(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE).size(), 2);
+    }
+
+    @Test
+    public void testPurgeEntitiesWithoutDelete() throws Exception {
+        // Create 2 database entities
+        AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
+        AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
+
+        // Purge the database entities without delete
+        Set<String> guids = Stream.of(entity1Header.getGuid(), entity2Header.getGuid()).collect(Collectors.toSet());
+        EntityMutationResponse purgeResponse = atlasClientV2.purgeEntitiesByGuids(guids);
+
+        // Verify that purgeEntities() response has database entity guids
+        assertNotNull(purgeResponse);
+        assertNull(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE));
+    }
+
+    @Test
     public void testDeleteEntityByUniqAttribute() throws Exception {
         // Create database entity
         AtlasEntity hiveDB = createHiveDB(DATABASE_NAME + randomUTF8());
@@ -802,4 +838,14 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
             put(name, value);
         }};
     }
+
+    private AtlasEntityHeader createRandomDatabaseEntity() {
+        AtlasEntity db = new AtlasEntity(DATABASE_TYPE_V2);
+        String dbName = randomString();
+        db.setAttribute("name", dbName);
+        db.setAttribute(NAME, dbName);
+        db.setAttribute("clusterName", randomString());
+        db.setAttribute("description", randomString());
+        return createEntity(db);
+    }
 }