You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/11/14 23:30:42 UTC
[atlas] branch branch-2.0 updated: ATLAS-3477: Add entity purge API
in Admin Resource
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7677035 ATLAS-3477: Add entity purge API in Admin Resource
7677035 is described below
commit 7677035451d61d9ba4a0daef56af85000c8b115c
Author: Sidharth <si...@gmail.com>
AuthorDate: Tue Nov 12 12:39:08 2019 -0800
ATLAS-3477: Add entity purge API in Admin Resource
(cherry picked from commit 09118ca750208afc8f01638531555aada33e9648)
---
.../org/apache/atlas/authorize/AtlasPrivilege.java | 3 +-
.../main/java/org/apache/atlas/AtlasClientV2.java | 11 ++++
.../atlas/listener/EntityChangeListenerV2.java | 15 +++++
.../atlas/model/audit/EntityAuditEventV2.java | 4 +-
.../model/instance/EntityMutationResponse.java | 8 +++
.../atlas/model/instance/EntityMutations.java | 3 +-
.../repository/audit/EntityAuditListenerV2.java | 28 ++++++++
.../repository/store/graph/AtlasEntityStore.java | 6 ++
.../repository/store/graph/v1/DeleteHandlerV1.java | 43 +++++++++----
.../store/graph/v2/AtlasEntityChangeNotifier.java | 24 ++++++-
.../store/graph/v2/AtlasEntityStoreV2.java | 56 +++++++++++++++-
.../store/graph/v2/AtlasGraphUtilsV2.java | 20 ++++++
.../main/java/org/apache/atlas/RequestContext.java | 22 ++++++-
.../notification/EntityNotificationListenerV2.java | 10 +++
.../apache/atlas/web/resources/AdminResource.java | 26 ++++++++
.../web/integration/EntityV2JerseyResourceIT.java | 74 ++++++++++++++++++----
16 files changed, 319 insertions(+), 34 deletions(-)
diff --git a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
index 59c596d..38b68fa 100644
--- a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
+++ b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
@@ -36,7 +36,8 @@ public enum AtlasPrivilege {
RELATIONSHIP_ADD("add-relationship"),
RELATIONSHIP_UPDATE("update-relationship"),
- RELATIONSHIP_REMOVE("remove-relationship");
+ RELATIONSHIP_REMOVE("remove-relationship"),
+ ADMIN_PURGE("admin-purge");
private final String type;
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 199d6bf..8c0a640 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -49,6 +49,7 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class AtlasClientV2 extends AtlasBaseClient {
// Type APIs
@@ -63,6 +64,11 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String GET_BY_NAME_TEMPLATE = TYPES_API + "%s/name/%s";
private static final String GET_BY_GUID_TEMPLATE = TYPES_API + "%s/guid/%s";
private static final String ENTITY_BULK_API = ENTITY_API + "bulk/";
+
+ //Admin Entity Purge
+ private static final String ADMIN_API = BASE_URI + "admin/";
+ private static final String ENTITY_PURGE_API = ADMIN_API + "purge/";
+
// Lineage APIs
private static final String LINEAGE_URI = BASE_URI + "v2/lineage/";
@@ -362,6 +368,10 @@ public class AtlasClientV2 extends AtlasBaseClient {
return callAPI(API_V2.DELETE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, "guid", guids);
}
+ public EntityMutationResponse purgeEntitiesByGuids(Set<String> guids) throws AtlasServiceException {
+ return callAPI(API_V2.PURGE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, guids);
+ }
+
public AtlasClassifications getClassifications(String guid) throws AtlasServiceException {
return callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, null);
}
@@ -555,6 +565,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 CREATE_ENTITIES = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 UPDATE_ENTITIES = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 DELETE_ENTITIES_BY_GUIDS = new API_V2(ENTITY_BULK_API, HttpMethod.DELETE, Response.Status.OK);
+ public static final API_V2 PURGE_ENTITIES_BY_GUIDS = new API_V2(ENTITY_PURGE_API, HttpMethod.DELETE, Response.Status.OK);
public static final API_V2 GET_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.GET, Response.Status.OK);
public static final API_V2 ADD_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.POST, Response.Status.NO_CONTENT);
public static final API_V2 UPDATE_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.PUT, Response.Status.NO_CONTENT);
diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
index 444167e..d36582b 100644
--- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -56,6 +56,14 @@ public interface EntityChangeListenerV2 {
*/
void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException;
+
+ /**
+ * This is upon purging entities from the repository.
+ *
+ * @param entities the purged entities
+ */
+ void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException;
+
/**
* This is upon adding new classifications to an entity.
*
@@ -124,6 +132,13 @@ public interface EntityChangeListenerV2 {
void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException;
/**
+ * This is upon purging relationships from the repository.
+ *
+ * @param relationships the purged relationships
+ */
+ void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException;
+
+ /**
* This is upon add new labels to an entity.
*
* @param entity the entity
diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index e9cc7cd..9301e21 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -50,7 +50,7 @@ public class EntityAuditEventV2 implements Serializable {
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
- TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE;
+ TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE;
public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) {
@@ -60,6 +60,8 @@ public class EntityAuditEventV2 implements Serializable {
return ENTITY_UPDATE;
case "ENTITY_DELETE":
return ENTITY_DELETE;
+ case "ENTITY_PURGE":
+ return ENTITY_PURGE;
case "ENTITY_IMPORT_CREATE":
return ENTITY_IMPORT_CREATE;
case "ENTITY_IMPORT_UPDATE":
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
index 7ace00d..b448d51 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
@@ -115,6 +115,14 @@ public class EntityMutationResponse {
}
@JsonIgnore
+ public List<AtlasEntityHeader> getPurgedEntities() {
+ if ( mutatedEntities != null) {
+ return mutatedEntities.get(EntityOperation.PURGE);
+ }
+ return null;
+ }
+
+ @JsonIgnore
public AtlasEntityHeader getFirstEntityCreated() {
final List<AtlasEntityHeader> entitiesByOperation = getEntitiesByOperation(EntityOperation.CREATE);
if ( entitiesByOperation != null && entitiesByOperation.size() > 0) {
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
index daf4ca8..1dd5aa0 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
@@ -48,7 +48,8 @@ public class EntityMutations implements Serializable {
CREATE,
UPDATE,
PARTIAL_UPDATE,
- DELETE
+ DELETE,
+ PURGE
}
public static final class EntityMutation implements Serializable {
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 43a9b84..706da16 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -54,6 +54,7 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_PURGE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE;
@@ -133,6 +134,23 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
@Override
+ public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
+ List<EntityAuditEventV2> events = new ArrayList<>();
+
+ for (AtlasEntity entity : entities) {
+ EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE, "Purged entity");
+
+ events.add(event);
+ }
+
+ auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
+ }
+
+ @Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
@@ -470,6 +488,9 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
case ENTITY_DELETE:
ret = "Deleted: ";
break;
+ case ENTITY_PURGE:
+ ret = "Purged: ";
+ break;
case CLASSIFICATION_ADD:
ret = "Added classification: ";
break;
@@ -521,4 +542,11 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
}
}
+
+ @Override
+ public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
+ }
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index b94590b..49dd5c5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -203,11 +203,17 @@ public interface AtlasEntityStore {
*/
String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException;
+
/*
* Return list of deleted entity guids
*/
EntityMutationResponse deleteByIds(List<String> guid) throws AtlasBaseException;
+ /*
+ * Return list of purged entity guids
+ */
+ EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException;
+
/**
* Add classification(s)
*/
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index d2544df..6214584 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -106,9 +106,16 @@ public abstract class DeleteHandlerV1 {
String guid = AtlasGraphUtilsV2.getIdFromVertex(instanceVertex);
AtlasEntity.Status state = getState(instanceVertex);
- if (state == DELETED || requestContext.isDeletedEntity(guid)) {
+ boolean needToSkip = requestContext.isPurgeRequested() ? (state == ACTIVE || requestContext.isPurgedEntity(guid)) :
+ (state == DELETED || requestContext.isDeletedEntity(guid));
+
+ if (needToSkip) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping deletion of {} as it is already deleted", guid);
+ if(RequestContext.get().isPurgeRequested()) {
+ LOG.debug("Skipping purging of {} as it is active or already purged", guid);
+ } else {
+ LOG.debug("Skipping deletion of {} as it is already deleted", guid);
+ }
}
continue;
@@ -149,10 +156,15 @@ public abstract class DeleteHandlerV1 {
public void deleteRelationships(Collection<AtlasEdge> edges, final boolean forceDelete) throws AtlasBaseException {
for (AtlasEdge edge : edges) {
boolean isInternal = isInternalType(edge.getInVertex()) && isInternalType(edge.getOutVertex());
+ boolean needToSkip = !isInternal && (RequestContext.get().isPurgeRequested() ? getState(edge) == ACTIVE : getState(edge) == DELETED);
- if (!isInternal && getState(edge) == DELETED) {
+ if (needToSkip) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
+ if(RequestContext.get().isPurgeRequested()) {
+ LOG.debug("Skipping purging of {} as it is active or already purged", getIdFromEdge(edge));
+ } else{
+ LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
+ }
}
continue;
@@ -183,8 +195,10 @@ public abstract class DeleteHandlerV1 {
AtlasVertex vertex = vertices.pop();
AtlasEntity.Status state = getState(vertex);
- if (state == DELETED) {
- //If the reference vertex is marked for deletion, skip it
+ //In case of purge If the reference vertex is active then skip it or else
+ //If the vertex marked for deletion, skip it
+ boolean needToSkip = RequestContext.get().isPurgeRequested() ? (state == ACTIVE) : (state == DELETED);
+ if (needToSkip) {
continue;
}
@@ -221,7 +235,9 @@ public abstract class DeleteHandlerV1 {
} else {
AtlasEdge edge = graphHelper.getEdgeForLabel(vertex, edgeLabel);
- if (edge == null || getState(edge) == DELETED) {
+ needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
+ getState(edge) == ACTIVE : getState(edge) == DELETED);
+ if (needToSkip) {
continue;
}
@@ -274,7 +290,9 @@ public abstract class DeleteHandlerV1 {
if (CollectionUtils.isNotEmpty(edges)) {
for (AtlasEdge edge : edges) {
- if (edge == null || getState(edge) == DELETED) {
+ needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
+ getState(edge) == ACTIVE : getState(edge) == DELETED);
+ if (needToSkip) {
continue;
}
@@ -838,8 +856,10 @@ public abstract class DeleteHandlerV1 {
final String outId = GraphHelper.getGuid(outVertex);
final Status state = getState(outVertex);
- if (state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId))) {
- //If the reference vertex is marked for deletion, skip updating the reference
+ boolean needToSkip = RequestContext.get().isPurgeRequested() ? state == ACTIVE || (outId != null && RequestContext.get().isPurgedEntity(outId)) :
+ state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId));
+
+ if (needToSkip) {
return;
}
@@ -954,7 +974,8 @@ public abstract class DeleteHandlerV1 {
for (AtlasEdge edge : incomingEdges) {
Status edgeState = getState(edge);
- if (edgeState == ACTIVE) {
+ boolean isProceed = RequestContext.get().isPurgeRequested()? edgeState == DELETED : edgeState == ACTIVE;
+ if (isProceed) {
if (isRelationshipEdge(edge)) {
deleteRelationship(edge);
} else {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index 3389d24..bd1ba58 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -102,6 +102,7 @@ public class AtlasEntityChangeNotifier {
List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities();
+ List<AtlasEntityHeader> purgedEntities = entityMutationResponse.getPurgedEntities();
// complete full text mapping before calling toReferenceables(), from notifyListners(), to
// include all vertex updates in the current graph-transaction
@@ -113,6 +114,7 @@ public class AtlasEntityChangeNotifier {
notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport);
notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport);
notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
+ notifyListeners(purgedEntities, EntityOperation.PURGE, isImport);
notifyPropagatedEntities();
}
@@ -340,7 +342,7 @@ public class AtlasEntityChangeNotifier {
private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
- if (instanceConverter != null) {
+ if (operation != EntityOperation.PURGE && instanceConverter != null) {
List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
for (EntityChangeListener listener : entityChangeListeners) {
@@ -381,6 +383,10 @@ public class AtlasEntityChangeNotifier {
case DELETE:
listener.onEntitiesDeleted(entities, isImport);
break;
+
+ case PURGE:
+ listener.onEntitiesPurged(entities);
+ break;
}
}
}
@@ -399,6 +405,9 @@ public class AtlasEntityChangeNotifier {
case DELETE:
listener.onRelationshipsDeleted(relationships, isImport);
break;
+ case PURGE:
+ listener.onRelationshipsPurged(relationships);
+ break;
}
}
}
@@ -485,7 +494,7 @@ public class AtlasEntityChangeNotifier {
final AtlasEntity entity;
// delete notifications don't need all attributes. Hence the special handling for delete operation
- if (operation == EntityOperation.DELETE) {
+ if (operation == EntityOperation.DELETE || operation == EntityOperation.PURGE) {
entity = new AtlasEntity(entityHeader);
} else {
String entityGuid = entityHeader.getGuid();
@@ -586,12 +595,23 @@ public class AtlasEntityChangeNotifier {
List<AtlasEntityHeader> updatedEntities = resp.getUpdatedEntities();
List<AtlasEntityHeader> partialUpdatedEntities = resp.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = resp.getDeletedEntities();
+ List<AtlasEntityHeader> purgedEntities = resp.getPurgedEntities();
// remove entities with DELETED status from created & updated lists
purgeDeletedEntities(createdEntities);
purgeDeletedEntities(updatedEntities);
purgeDeletedEntities(partialUpdatedEntities);
+ // remove entities purged in this mutation from created & updated lists
+ if (purgedEntities != null) {
+ for (AtlasEntityHeader entity : purgedEntities) {
+ purgeEntity(entity.getGuid(), deletedEntities);
+ purgeEntity(entity.getGuid(), createdEntities);
+ purgeEntity(entity.getGuid(), updatedEntities);
+ purgeEntity(entity.getGuid(), partialUpdatedEntities);
+ }
+ }
+
// remove entities deleted in this mutation from created & updated lists
if (deletedEntities != null) {
for (AtlasEntityHeader entity : deletedEntities) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 17a9649..c8e65ef 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
@@ -43,6 +44,7 @@ import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.store.DeleteType;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -69,8 +71,7 @@ import java.util.Objects;
import java.util.Set;
import static java.lang.Boolean.FALSE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
@@ -475,6 +476,42 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
@Override
@GraphTransaction
+ public EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException {
+ if (CollectionUtils.isEmpty(guids)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
+ }
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "purge entity: guids=", guids);
+ Collection<AtlasVertex> purgeCandidates = new ArrayList<>();
+
+ for (String guid : guids) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findDeletedByGuid(guid);
+
+ if (vertex == null) {
+ // Entity does not exist - treat as non-error, since the caller
+ // wanted to delete the entity and it's already gone.
+ LOG.warn("Purge request ignored for non-existent/active entity with guid " + guid);
+
+ continue;
+ }
+
+ purgeCandidates.add(vertex);
+ }
+
+ if (purgeCandidates.isEmpty()) {
+ LOG.info("No purge candidate entities were found for guids: " + guids + " which is already deleted");
+ }
+
+ EntityMutationResponse ret = purgeVertices(purgeCandidates);
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret, false);
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
if (MapUtils.isEmpty(uniqAttributes)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString());
@@ -1124,6 +1161,21 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
return response;
}
+ private EntityMutationResponse purgeVertices(Collection<AtlasVertex> purgeCandidates) throws AtlasBaseException {
+ EntityMutationResponse response = new EntityMutationResponse();
+ RequestContext req = RequestContext.get();
+
+ req.setDeleteType(DeleteType.HARD);
+ req.setPurgeRequested(true);
+ deleteDelegate.getHandler().deleteEntities(purgeCandidates); // this will update req with list of purged entities
+
+ for (AtlasEntityHeader entity : req.getDeletedEntities()) {
+ response.addEntity(PURGE, entity);
+ }
+
+ return response;
+ }
+
private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException {
AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 4d57d8b..bf13338 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -353,6 +353,26 @@ public class AtlasGraphUtilsV2 {
return ret;
}
+ public static AtlasVertex findDeletedByGuid(String guid) {
+ AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
+
+ if (ret == null) {
+ AtlasGraphQuery query = getGraphInstance().query()
+ .has(Constants.GUID_PROPERTY_KEY, guid)
+ .has(STATE_PROPERTY_KEY, Status.DELETED.name());
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ ret = results.hasNext() ? results.next() : null;
+
+ if (ret != null) {
+ GraphTransactionInterceptor.addToVertexCache(guid, ret);
+ }
+ }
+
+ return ret;
+ }
+
public static String getTypeNameFromGuid(String guid) {
String ret = null;
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 3d2a18f..f9ca7a2 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -51,6 +51,7 @@ public class RequestContext {
private final long requestTime = System.currentTimeMillis();
private final Map<String, AtlasEntityHeader> updatedEntities = new HashMap<>();
private final Map<String, AtlasEntityHeader> deletedEntities = new HashMap<>();
+ private final Map<String, AtlasEntityHeader> purgedEntities = new HashMap<>();
private final Map<String, AtlasEntity> entityCache = new HashMap<>();
private final Map<String, AtlasEntityWithExtInfo> entityExtInfoCache = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
@@ -64,6 +65,7 @@ public class RequestContext {
private String clientIPAddress;
private List<String> forwardedAddresses;
private DeleteType deleteType = DeleteType.DEFAULT;
+ private boolean isPurgeRequested = false;
private int maxAttempts = 1;
private int attemptCount = 1;
private boolean isImportInProgress = false;
@@ -108,6 +110,7 @@ public class RequestContext {
public void clearCache() {
this.updatedEntities.clear();
this.deletedEntities.clear();
+ this.purgedEntities.clear();
this.entityCache.clear();
this.entityExtInfoCache.clear();
this.addedPropagations.clear();
@@ -179,6 +182,10 @@ public class RequestContext {
isImportInProgress = importInProgress;
}
+ public boolean isPurgeRequested() { return isPurgeRequested; }
+
+ public void setPurgeRequested(boolean isPurgeRequested) { this.isPurgeRequested = isPurgeRequested; }
+
public boolean isInNotificationProcessing() {
return isInNotificationProcessing;
}
@@ -215,13 +222,18 @@ public class RequestContext {
}
}
-
public void recordEntityDelete(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
deletedEntities.put(entity.getGuid(), entity);
}
}
+ public void recordEntityPurge(AtlasEntityHeader entity) {
+ if (entity != null && entity.getGuid() != null) {
+ purgedEntities.put(entity.getGuid(), entity);
+ }
+ }
+
public void recordAddedPropagation(String guid, AtlasClassification classification) {
if (StringUtils.isNotEmpty(guid) && classification != null) {
List<AtlasClassification> classifications = addedPropagations.get(guid);
@@ -302,6 +314,10 @@ public class RequestContext {
return deletedEntities.values();
}
+ public Collection<AtlasEntityHeader> getPurgedEntities() {
+ return purgedEntities.values();
+ }
+
/**
* Checks if an instance with the given guid is in the cache for this request. Either returns the instance
* or null if it is not in the cache.
@@ -329,7 +345,9 @@ public class RequestContext {
return deletedEntities.containsKey(guid);
}
-
+ public boolean isPurgedEntity(String guid) {
+ return purgedEntities.containsKey(guid);
+ }
public MetricRecorder startMetricRecord(String name) { return metrics != null ? metrics.getMetricRecorder(name) : null; }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index 48f0cd3..8893380 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -97,6 +97,11 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
}
@Override
+ public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
+ // do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
+ }
+
+ @Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_ADD);
}
@@ -296,4 +301,9 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
notifyRelationshipEvents(relationships, RELATIONSHIP_DELETE);
}
+
+ @Override
+ public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
+ // do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
+ }
}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 464d46f..40fd6ef 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -37,6 +37,7 @@ import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.model.instance.AtlasCheckStateRequest;
import org.apache.atlas.model.instance.AtlasCheckStateResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.repository.impexp.AtlasServerService;
@@ -56,6 +57,7 @@ import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -431,6 +433,30 @@ public class AdminResource {
return result;
}
+ @DELETE
+ @Path("/purge")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException {
+ if (CollectionUtils.isNotEmpty(guids)) {
+ for (String guid : guids) {
+ Servlets.validateQueryParamLength("guid", guid);
+ }
+ }
+
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.purgeByGuids(" + guids + ")");
+ }
+
+ return entityStore.purgeByIds(guids);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
@POST
@Path("/importfile")
@Produces(Servlets.JSON_MEDIA_TYPE)
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
index 5d506bb..a4b7b0d 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
@@ -21,6 +21,7 @@ package org.apache.atlas.web.integration;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Lists;
import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.EntityAuditEvent;
@@ -49,6 +50,8 @@ import org.testng.annotations.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.testng.Assert.*;
@@ -60,6 +63,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
private static final Logger LOG = LoggerFactory.getLogger(EntityV2JerseyResourceIT.class);
+ private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = "atlas.notification.entity.version";
+
private final String DATABASE_NAME = "db" + randomString();
private final String TABLE_NAME = "table" + randomString();
private String traitName;
@@ -755,20 +760,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
@Test
public void testDeleteEntities() throws Exception {
// Create 2 database entities
- AtlasEntity db1 = new AtlasEntity(DATABASE_TYPE_V2);
- String dbName1 = randomString();
- db1.setAttribute("name", dbName1);
- db1.setAttribute(NAME, dbName1);
- db1.setAttribute("clusterName", randomString());
- db1.setAttribute("description", randomString());
- AtlasEntityHeader entity1Header = createEntity(db1);
- AtlasEntity db2 = new AtlasEntity(DATABASE_TYPE_V2);
- String dbName2 = randomString();
- db2.setAttribute("name", dbName2);
- db2.setAttribute(NAME, dbName2);
- db2.setAttribute("clusterName", randomString());
- db2.setAttribute("description", randomString());
- AtlasEntityHeader entity2Header = createEntity(db2);
+ AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
+ AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
// Delete the database entities
EntityMutationResponse deleteResponse = atlasClientV2.deleteEntitiesByGuids(Arrays.asList(entity1Header.getGuid(), entity2Header.getGuid()));
@@ -782,6 +775,49 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
}
@Test
+ public void testPurgeEntities() throws Exception {
+ // Create 2 database entities
+ AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
+ AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
+
+ ApplicationProperties.get().setProperty(ENTITY_NOTIFICATION_VERSION_PROPERTY, "v2");
+
+ // Delete the database entities
+ EntityMutationResponse deleteResponse = atlasClientV2.deleteEntitiesByGuids(Arrays.asList(entity1Header.getGuid(), entity2Header.getGuid()));
+
+ // Verify that deleteEntities() response has database entity guids
+ assertNotNull(deleteResponse);
+ assertNotNull(deleteResponse.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE));
+ assertEquals(deleteResponse.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2);
+
+ Thread.sleep(1000);
+
+ // Purge the database entities
+ Set<String> guids = Stream.of(entity1Header.getGuid(), entity2Header.getGuid()).collect(Collectors.toSet());
+ EntityMutationResponse purgeResponse = atlasClientV2.purgeEntitiesByGuids(guids);
+
+ // Verify that purgeEntities() response has database entity guids
+ assertNotNull(purgeResponse);
+ assertNotNull(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE));
+ assertEquals(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE).size(), 2);
+ }
+
+ @Test
+ public void testPurgeEntitiesWithoutDelete() throws Exception {
+ // Create 2 database entities
+ AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
+ AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
+
+ // Purge the database entities without delete
+ Set<String> guids = Stream.of(entity1Header.getGuid(), entity2Header.getGuid()).collect(Collectors.toSet());
+ EntityMutationResponse purgeResponse = atlasClientV2.purgeEntitiesByGuids(guids);
+
+ // Verify that purgeEntities() response has database entity guids
+ assertNotNull(purgeResponse);
+ assertNull(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE));
+ }
+
+ @Test
public void testDeleteEntityByUniqAttribute() throws Exception {
// Create database entity
AtlasEntity hiveDB = createHiveDB(DATABASE_NAME + randomUTF8());
@@ -802,4 +838,14 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
put(name, value);
}};
}
+
+ private AtlasEntityHeader createRandomDatabaseEntity() {
+ AtlasEntity db = new AtlasEntity(DATABASE_TYPE_V2);
+ String dbName = randomString();
+ db.setAttribute("name", dbName);
+ db.setAttribute(NAME, dbName);
+ db.setAttribute("clusterName", randomString());
+ db.setAttribute("description", randomString());
+ return createEntity(db);
+ }
}