You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ap...@apache.org on 2018/02/22 05:39:52 UTC
atlas git commit: ATLAS-2443: Entity DELETE should send compacted
entity in outgoing notifications
Repository: atlas
Updated Branches:
refs/heads/branch-0.8 0dae3e8e3 -> acc264bb5
ATLAS-2443: Entity DELETE should send compacted entity in outgoing notifications
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/acc264bb
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/acc264bb
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/acc264bb
Branch: refs/heads/branch-0.8
Commit: acc264bb508c6d1a47a290e938fa6a085edb4642
Parents: 0dae3e8
Author: apoorvnaik <ap...@apache.org>
Authored: Tue Feb 20 13:53:28 2018 -0800
Committer: apoorvnaik <ap...@apache.org>
Committed: Wed Feb 21 21:39:21 2018 -0800
----------------------------------------------------------------------
.../graph/v1/AtlasEntityChangeNotifier.java | 41 +---------
.../store/graph/v1/AtlasEntityStoreV1.java | 4 +-
.../store/graph/v1/DeleteHandlerV1.java | 48 +++--------
.../store/graph/v1/EntityGraphMapper.java | 4 +-
.../store/graph/v1/EntityGraphRetriever.java | 5 +-
.../java/org/apache/atlas/RequestContextV1.java | 83 ++++++++++++--------
.../typesystem/persistence/StructInstance.java | 6 +-
.../NotificationEntityChangeListener.java | 47 +++++++++--
8 files changed, 112 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index 1530525..4ecf086 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -33,13 +33,9 @@ import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
-import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
-import org.apache.atlas.typesystem.types.ClassType;
-import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +45,6 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
@@ -190,38 +185,7 @@ public class AtlasEntityChangeNotifier {
// fail, since the entity vertex would already be gone. Hence the special handling for delete operation
if (operation == EntityOperation.DELETE) {
for (AtlasEntityHeader entity : entityHeaders) {
- ReferenceableInstance instance = null;
-
- if (MapUtils.isNotEmpty(entity.getAttributes())) {
- try {
- TypeSystem typeSystem = TypeSystem.getInstance();
- ClassType entityType = (ClassType) typeSystem.getDataType(entity.getTypeName());
-
- instance = (ReferenceableInstance) entityType.createInstance(new Id(entity.getGuid(), 0, entity.getTypeName()));
-
- // set all attributes to null
- for (String attribute : instance.fieldMapping().fields.keySet()) {
- instance.setNull(attribute);
- }
-
- // set attributes from entity header
- for (Map.Entry<String, Object> attr : entity.getAttributes().entrySet()) {
- try {
- instance.set(attr.getKey(), attr.getValue());
- } catch (AtlasException e) {
- LOG.warn("failed to set attribute {}={} for entity: type={}, guid={}", attr.getKey(), attr.getValue(), entity.getTypeName(), entity.getGuid(), e);
- }
- }
- } catch (AtlasException e) {
- LOG.warn("failed to get type {}", entity.getTypeName(), e);
- }
- }
-
- if (instance == null) {
- instance = new ReferenceableInstance(entity.getGuid(), entity.getTypeName());
- }
-
- ret.add(instance);
+ ret.add(new ReferenceableInstance(entity.getGuid(), entity.getTypeName()));
}
} else {
for (AtlasEntityHeader entityHeader : entityHeaders) {
@@ -303,9 +267,6 @@ public class AtlasEntityChangeNotifier {
}
AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(entityId);
- if(atlasVertex == null) {
- return;
- }
if (atlasVertex == null) {
LOG.warn("updateFullTextMapping(): no entity exists with guid {}", entityId);
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index ebce279..a76626f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -594,11 +594,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
EntityMutationResponse response = new EntityMutationResponse();
deleteHandler.deleteEntities(deletionCandidates);
RequestContextV1 req = RequestContextV1.get();
- for (AtlasObjectId id : req.getDeletedEntityIds()) {
+ for (AtlasObjectId id : req.getDeletedEntities()) {
response.addEntity(DELETE, EntityGraphMapper.constructHeader(id));
}
- for (AtlasObjectId id : req.getUpdatedEntityIds()) {
+ for (AtlasObjectId id : req.getUpdatedEntities()) {
response.addEntity(UPDATE, EntityGraphMapper.constructHeader(id));
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index ad8ab8e..fe2c7be 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -18,7 +18,6 @@
package org.apache.atlas.repository.store.graph.v1;
-import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContextV1;
@@ -40,19 +39,16 @@ import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.Stack;
@@ -63,14 +59,16 @@ public abstract class DeleteHandlerV1 {
public static final Logger LOG = LoggerFactory.getLogger(DeleteHandlerV1.class);
- private AtlasTypeRegistry typeRegistry;
- private boolean shouldUpdateInverseReferences;
- private boolean softDelete;
+ private AtlasTypeRegistry typeRegistry;
+ private EntityGraphRetriever entityGraphRetriever;
+ private boolean shouldUpdateInverseReferences;
+ private boolean softDelete;
protected static final GraphHelper graphHelper = GraphHelper.getInstance();
public DeleteHandlerV1(AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete) {
this.typeRegistry = typeRegistry;
+ this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.shouldUpdateInverseReferences = shouldUpdateInverseReference;
this.softDelete = softDelete;
}
@@ -100,7 +98,7 @@ public abstract class DeleteHandlerV1 {
String typeName = AtlasGraphUtilsV1.getTypeName(instanceVertex);
AtlasObjectId objId = new AtlasObjectId(guid, typeName);
- if (requestContext.getDeletedEntityIds().contains(objId)) {
+ if (requestContext.isDeletedEntity(objId.getGuid())) {
LOG.debug("Skipping deletion of {} as it is already deleted", guid);
continue;
}
@@ -111,35 +109,9 @@ public abstract class DeleteHandlerV1 {
// Record all deletion candidate GUIDs in RequestContext
// and gather deletion candidate vertices.
for (GraphHelper.VertexInfo vertexInfo : compositeVertices) {
- AtlasVertex vertex = vertexInfo.getVertex();
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(vertexInfo.getTypeName());
- Map<String, Object> attributes = null;
-
- if (entityType != null && MapUtils.isNotEmpty(entityType.getUniqAttributes())) {
- attributes = new HashMap<>();
-
- for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
- Object attrVal = GraphHelper.getProperty(vertex, attribute.getQualifiedName());
-
- if (attrVal != null) {
- attributes.put(attribute.getName(), attrVal);
- }
- }
-
- // include clusterName attribute as well, if it is defined in the entity-type
- AtlasAttribute attrClusterName = entityType.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE);
-
- if (attrClusterName != null) {
- Object clusterName = GraphHelper.getProperty(vertex, attrClusterName.getQualifiedName());
-
- if (clusterName != null) {
- attributes.put(attrClusterName.getName(), clusterName);
- }
- }
- }
-
- requestContext.recordEntityDelete(new AtlasObjectId(vertexInfo.getGuid(), vertexInfo.getTypeName(), attributes));
-
+ AtlasEntity atlasEntity = entityGraphRetriever.toAtlasEntity(vertexInfo.getGuid());
+ requestContext.cache(atlasEntity);
+ requestContext.recordEntityDelete(new AtlasObjectId(atlasEntity.getGuid(), atlasEntity.getTypeName()));
deletionCandidateVertices.add(vertexInfo.getVertex());
}
}
@@ -439,7 +411,7 @@ public abstract class DeleteHandlerV1 {
AtlasObjectId objId = new AtlasObjectId(outId, typeName);
AtlasEntity.Status state = AtlasGraphUtilsV1.getState(outVertex);
- if (state == AtlasEntity.Status.DELETED || (outId != null && RequestContextV1.get().isDeletedEntity(objId))) {
+ if (state == AtlasEntity.Status.DELETED || (outId != null && RequestContextV1.get().isDeletedEntity(objId.getGuid()))) {
//If the reference vertex is marked for deletion, skip updating the reference
return;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index d3df464..682c7bf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -168,11 +168,11 @@ public class EntityGraphMapper {
RequestContextV1 req = RequestContextV1.get();
- for (AtlasObjectId id : req.getDeletedEntityIds()) {
+ for (AtlasObjectId id : req.getDeletedEntities()) {
resp.addEntity(DELETE, constructHeader(id));
}
- for (AtlasObjectId id : req.getUpdatedEntityIds()) {
+ for (AtlasObjectId id : req.getUpdatedEntities()) {
if (isPartialUpdate) {
resp.addEntity(PARTIAL_UPDATE, constructHeader(id));
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index 6132cb0..6bb4554 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -45,7 +45,9 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -59,7 +61,7 @@ import java.util.Set;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
-
+@Component
public final class EntityGraphRetriever {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class);
@@ -73,6 +75,7 @@ public final class EntityGraphRetriever {
private final AtlasTypeRegistry typeRegistry;
+ @Inject
public EntityGraphRetriever(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
index 08aa960..2123166 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
@@ -19,32 +19,30 @@
package org.apache.atlas;
import org.apache.atlas.metrics.Metrics;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.typesystem.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
public class RequestContextV1 {
private static final Logger LOG = LoggerFactory.getLogger(RequestContextV1.class);
private static final ThreadLocal<RequestContextV1> CURRENT_CONTEXT = new ThreadLocal<>();
- private Set<AtlasObjectId> createdEntityIds = new LinkedHashSet<>();
- private Set<AtlasObjectId> updatedEntityIds = new LinkedHashSet<>();
- private Set<AtlasObjectId> deletedEntityIds = new LinkedHashSet<>();
+ private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>();
+ private final Map<String, AtlasObjectId> deletedEntities = new HashMap<>();
+ private final Map<String, AtlasEntity> entityCacheV2 = new HashMap<>();
+ private final Metrics metrics = new Metrics();
+ private final long requestTime = System.currentTimeMillis();
private String user;
- private final long requestTime;
- TypeSystem typeSystem = TypeSystem.getInstance();
- private Metrics metrics = new Metrics();
private RequestContextV1() {
- requestTime = System.currentTimeMillis();
}
//To handle gets from background threads where createContext() is not called
@@ -60,6 +58,14 @@ public class RequestContextV1 {
return ret;
}
public static void clear() {
+ RequestContextV1 instance = CURRENT_CONTEXT.get();
+
+ if (instance != null) {
+ instance.updatedEntities.clear();
+ instance.deletedEntities.clear();
+ instance.entityCacheV2.clear();
+ }
+
CURRENT_CONTEXT.remove();
}
@@ -71,44 +77,57 @@ public class RequestContextV1 {
this.user = user;
}
- public void recordEntityCreate(Collection<AtlasObjectId> createdEntityIds) {
- this.createdEntityIds.addAll(createdEntityIds);
- }
-
- public void recordEntityCreate(AtlasObjectId createdEntityId) {
- this.createdEntityIds.add(createdEntityId);
- }
-
- public void recordEntityUpdate(Collection<AtlasObjectId> updatedEntityIds) {
- this.updatedEntityIds.addAll(updatedEntityIds);
+ public void recordEntityUpdate(AtlasObjectId entity) {
+ if (entity != null && entity.getGuid() != null) {
+ updatedEntities.put(entity.getGuid(), entity);
+ }
}
- public void recordEntityUpdate(AtlasObjectId entityId) {
- this.updatedEntityIds.add(entityId);
+ public void recordEntityDelete(AtlasObjectId entity) {
+ if (entity != null && entity.getGuid() != null) {
+ deletedEntities.put(entity.getGuid(), entity);
+ }
}
- public void recordEntityDelete(AtlasObjectId entityId) {
- deletedEntityIds.add(entityId);
+ /**
+ * Adds the specified instance to the cache
+ *
+ */
+ public void cache(AtlasEntity entity) {
+ if (entity != null && entity.getGuid() != null) {
+ entityCacheV2.put(entity.getGuid(), entity);
+ }
}
- public Collection<AtlasObjectId> getCreatedEntityIds() {
- return createdEntityIds;
+ public Collection<AtlasObjectId> getUpdatedEntities() {
+ return updatedEntities.values();
}
- public Collection<AtlasObjectId> getUpdatedEntityIds() {
- return updatedEntityIds;
+ public Collection<AtlasObjectId> getDeletedEntities() {
+ return deletedEntities.values();
}
- public Collection<AtlasObjectId> getDeletedEntityIds() {
- return deletedEntityIds;
+ /**
+ * Checks if an instance with the given guid is in the cache for this request. Either returns the instance
+ * or null if it is not in the cache.
+ *
+ * @param guid the guid to find
+ * @return Either the instance or null if it is not in the cache.
+ */
+ public AtlasEntity getInstanceV2(String guid) {
+ return entityCacheV2.get(guid);
}
public long getRequestTime() {
return requestTime;
}
- public boolean isDeletedEntity(AtlasObjectId entityId) {
- return deletedEntityIds.contains(entityId);
+ public boolean isUpdatedEntity(String guid) {
+ return updatedEntities.containsKey(guid);
+ }
+
+ public boolean isDeletedEntity(String guid) {
+ return deletedEntities.containsKey(guid);
}
public static Metrics getMetrics() {
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java b/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java
index d939a9b..3136ae2 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java
@@ -285,10 +285,10 @@ public class StructInstance implements ITypedStruct {
public Map<String, Object> getValuesMap() throws AtlasException {
Map<String, Object> m = new HashMap<>();
for (String attr : fieldMapping.fields.keySet()) {
- int pos = fieldMapping.fieldNullPos.get(attr);
- if (!nullFlags[pos]) {
+// int pos = fieldMapping.fieldNullPos.get(attr);
+// if (!nullFlags[pos]) {
m.put(attr, get(attr));
- }
+// }
}
return m;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/acc264bb/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index 53acf56..d80662f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -20,9 +20,14 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.entity.EntityNotificationImpl;
+import org.apache.atlas.repository.converters.AtlasFormatConverter;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
@@ -35,6 +40,9 @@ import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
@@ -53,8 +61,11 @@ import java.util.Set;
@Component
public class NotificationEntityChangeListener implements EntityChangeListener {
- private final NotificationInterface notificationInterface;
- private final TypeSystem typeSystem;
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationEntityChangeListener.class);;
+
+ private final NotificationInterface notificationInterface;
+ private final TypeSystem typeSystem;
+ private final AtlasInstanceConverter instanceConverter;
private Map<String, List<String>> notificationAttributesCache = new HashMap<>();
private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity";
@@ -71,9 +82,12 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
* @param typeSystem the Atlas type system
*/
@Inject
- public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
+ public NotificationEntityChangeListener(NotificationInterface notificationInterface,
+ TypeSystem typeSystem,
+ @Lazy AtlasInstanceConverter instanceConverter) {
this.notificationInterface = notificationInterface;
this.typeSystem = typeSystem;
+ this.instanceConverter = instanceConverter;
}
@@ -170,19 +184,36 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
continue;
}
- Referenceable entity = new Referenceable(entityDefinition);
- Map<String, Object> attributesMap = entity.getValuesMap();
- List<String> entityNotificationAttrs = getNotificationAttributes(entity.getTypeName());
+ Referenceable referenceable = new Referenceable(entityDefinition);
+ // Special handling is needed for the (hard) DELETE entity case where the vertex is lost
+ // at this point hence we need to convert the cached AtlasEntity
+ if (operationType == EntityNotification.OperationType.ENTITY_DELETE) {
+ String guid = entityDefinition.getId()._getId();
+ AtlasEntity entity = RequestContextV1.get().getInstanceV2(guid);
+ if (entity != null) {
+ try {
+ referenceable = instanceConverter.getReferenceable(entity, new AtlasFormatConverter.ConverterContext());
+ } catch (AtlasBaseException e) {
+ LOG.warn("AtlasEntity to Referenceable conversion failed for guid {}. Reason: {}", guid, e.getMessage());
+ }
+ } else {
+ LOG.warn("Cache miss for AtlasEntity: guid={}", guid);
+ }
+ }
+
+ // Common logic for all events
+ Map<String, Object> attributesMap = referenceable.getValuesMap();
+ List<String> entityNotificationAttrs = getNotificationAttributes(referenceable.getTypeName());
if (MapUtils.isNotEmpty(attributesMap) && CollectionUtils.isNotEmpty(entityNotificationAttrs)) {
for (String entityAttr : attributesMap.keySet()) {
if (!entityNotificationAttrs.contains(entityAttr)) {
- entity.setNull(entityAttr);
+ referenceable.setNull(entityAttr);
}
}
}
- EntityNotificationImpl notification = new EntityNotificationImpl(entity, operationType, getAllTraits(entity, typeSystem));
+ EntityNotificationImpl notification = new EntityNotificationImpl(referenceable, operationType, getAllTraits(referenceable, typeSystem));
messages.add(notification);
}