You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/02/04 05:10:27 UTC
[atlas] branch master updated: ATLAS-3044: fixed import to handle
entity-delete
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new a9a095e ATLAS-3044: fixed import to handle entity-delete
a9a095e is described below
commit a9a095e1d87ee515b85ba281c96dabb78e2c415f
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Fri Feb 1 14:45:57 2019 -0800
ATLAS-3044: fixed import to handle entity-delete
---
.../apache/atlas/model/instance/AtlasEntity.java | 9 +++
.../atlas/model/instance/AtlasEntityHeader.java | 1 +
.../model/instance/EntityMutationResponse.java | 25 -------
.../apache/atlas/repository/graph/GraphHelper.java | 9 ++-
.../repository/store/graph/v1/DeleteHandlerV1.java | 11 +--
.../store/graph/v2/AtlasEntityChangeNotifier.java | 87 +++++++++++++++++++++-
.../store/graph/v2/AtlasEntityStoreV2.java | 79 ++++++++++++++++----
.../store/graph/v2/EntityGraphMapper.java | 45 +++++------
.../store/graph/v2/EntityGraphRetriever.java | 40 ----------
.../store/graph/v2/EntityMutationContext.java | 35 +++++++--
.../main/java/org/apache/atlas/RequestContext.java | 14 ++--
.../web/resources/AdminExportImportTestIT.java | 12 +--
12 files changed, 227 insertions(+), 140 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 18018d7..5b24ef1 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -117,6 +117,15 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
init();
}
+ public AtlasEntity(AtlasEntityHeader header) {
+ super(header.getTypeName(), header.getAttributes());
+
+ setGuid(header.getGuid());
+ setStatus(header.getStatus());
+ setClassifications(header.getClassifications());
+ setMeanings(header.getMeanings());
+ }
+
public AtlasEntity(Map map) {
super(map);
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
index 6ad830a..26687bf 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
@@ -104,6 +104,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
public AtlasEntityHeader(AtlasEntity entity) {
super(entity.getTypeName(), entity.getAttributes());
setGuid(entity.getGuid());
+ setStatus(entity.getStatus());
setClassifications(entity.getClassifications());
if (CollectionUtils.isNotEmpty(entity.getClassifications())) {
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
index 4589262..7ace00d 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
@@ -216,31 +216,6 @@ public class EntityMutationResponse {
}
}
- @JsonIgnore
- public void addEntity(EntityOperation op, AtlasObjectId entity) {
- if (mutatedEntities == null) {
- mutatedEntities = new HashMap<>();
- } else {
- // if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE
- if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) {
- if (entityHeaderExists(getCreatedEntities(), entity.getGuid())) {
- return;
- }
- }
- }
-
- List<AtlasEntityHeader> opEntities = mutatedEntities.get(op);
-
- if (opEntities == null) {
- opEntities = new ArrayList<>();
- mutatedEntities.put(op, opEntities);
- }
-
- if (!entityHeaderExists(opEntities, entity.getGuid())) {
- opEntities.add(new AtlasEntityHeader(entity.getTypeName(), entity.getGuid(), entity.getUniqueAttributes()));
- }
- }
-
private boolean entityHeaderExists(List<AtlasEntityHeader> entityHeaders, String guid) {
boolean ret = false;
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 45bf8dc..97e3e25 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -28,6 +28,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
@@ -1378,15 +1379,15 @@ public final class GraphHelper {
* Guid and AtlasVertex combo
*/
public static class VertexInfo {
- private final AtlasObjectId entity;
- private final AtlasVertex vertex;
+ private final AtlasEntityHeader entity;
+ private final AtlasVertex vertex;
- public VertexInfo(AtlasObjectId entity, AtlasVertex vertex) {
+ public VertexInfo(AtlasEntityHeader entity, AtlasVertex vertex) {
this.entity = entity;
this.vertex = vertex;
}
- public AtlasObjectId getEntity() { return entity; }
+ public AtlasEntityHeader getEntity() { return entity; }
public AtlasVertex getVertex() {
return vertex;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 0ae57c1..b2bd896 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
@@ -191,9 +192,9 @@ public abstract class DeleteHandlerV1 {
continue;
}
- AtlasObjectId entity = entityRetriever.toAtlasObjectId(vertex);
- String typeName = entity.getTypeName();
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+ AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(vertex);
+ String typeName = entity.getTypeName();
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), typeName);
@@ -349,7 +350,7 @@ public abstract class DeleteHandlerV1 {
AtlasGraphUtilsV2.setEncodedProperty(referencedVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, requestContext.getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(referencedVertex, MODIFIED_BY_KEY, requestContext.getUser());
- requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(referencedVertex));
+ requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(referencedVertex));
}
}
} else {
@@ -935,7 +936,7 @@ public abstract class DeleteHandlerV1 {
AtlasGraphUtilsV2.setEncodedProperty(outVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, requestContext.getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(outVertex, MODIFIED_BY_KEY, requestContext.getUser());
- requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(outVertex));
+ requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(outVertex));
}
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index a6a18dc..035b02c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -56,6 +56,7 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
@@ -95,6 +96,8 @@ public class AtlasEntityChangeNotifier {
return;
}
+ pruneResponse(entityMutationResponse);
+
List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities();
List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
@@ -442,7 +445,6 @@ public class AtlasEntityChangeNotifier {
if (CollectionUtils.isNotEmpty(entityHeaders)) {
for (AtlasEntityHeader entityHeader : entityHeaders) {
- String entityGuid = entityHeader.getGuid();
String typeName = entityHeader.getTypeName();
AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName);
@@ -462,10 +464,10 @@ public class AtlasEntityChangeNotifier {
// delete notifications don't need all attributes. Hence the special handling for delete operation
if (operation == EntityOperation.DELETE) {
- entity = new AtlasEntity(typeName, entityHeader.getAttributes());
-
- entity.setGuid(entityGuid);
+ entity = new AtlasEntity(entityHeader);
} else {
+ String entityGuid = entityHeader.getGuid();
+
entity = instanceConverter.getAndCacheEntity(entityGuid);
}
@@ -556,4 +558,81 @@ public class AtlasEntityChangeNotifier {
doFullTextMapping(Collections.singletonList(entityHeader));
}
+
+ private void pruneResponse(EntityMutationResponse resp) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> pruneResponse()");
+ }
+
+ List<AtlasEntityHeader> createdEntities = resp.getCreatedEntities();
+ List<AtlasEntityHeader> updatedEntities = resp.getUpdatedEntities();
+ List<AtlasEntityHeader> partialUpdatedEntities = resp.getPartialUpdatedEntities();
+ List<AtlasEntityHeader> deletedEntities = resp.getDeletedEntities();
+
+ // remove entities with DELETED status from created & updated lists
+ purgeDeletedEntities(createdEntities);
+ purgeDeletedEntities(updatedEntities);
+ purgeDeletedEntities(partialUpdatedEntities);
+
+ // remove entities deleted in this mutation from created & updated lists
+ if (deletedEntities != null) {
+ for (AtlasEntityHeader entity : deletedEntities) {
+ purgeEntity(entity.getGuid(), createdEntities);
+ purgeEntity(entity.getGuid(), updatedEntities);
+ purgeEntity(entity.getGuid(), partialUpdatedEntities);
+ }
+ }
+
+ // remove entities created in this mutation from updated lists
+ if (createdEntities != null) {
+ for (AtlasEntityHeader entity : createdEntities) {
+ purgeEntity(entity.getGuid(),updatedEntities);
+ purgeEntity(entity.getGuid(), partialUpdatedEntities);
+ }
+ }
+
+ // remove entities updated in this mutation from partial-updated list
+ if (updatedEntities != null) {
+ for (AtlasEntityHeader entity : updatedEntities) {
+ purgeEntity(entity.getGuid(), partialUpdatedEntities);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== pruneResponse()");
+ }
+ }
+
+ private void purgeDeletedEntities(List<AtlasEntityHeader> entities) {
+ if (entities != null) {
+ for (ListIterator<AtlasEntityHeader> iter = entities.listIterator(); iter.hasNext(); ) {
+ AtlasEntityHeader entity = iter.next();
+
+ if (entity.getStatus() == AtlasEntity.Status.DELETED) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("purgeDeletedEntities(guid={}, status={}): REMOVED", entity.getGuid(), entity.getStatus());
+ }
+
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ private void purgeEntity(String guid, List<AtlasEntityHeader> entities) {
+ if (guid != null && entities != null) {
+ for (ListIterator<AtlasEntityHeader> iter = entities.listIterator(); iter.hasNext(); ) {
+ AtlasEntityHeader entity = iter.next();
+
+ if (org.apache.commons.lang.StringUtils.equals(guid, entity.getGuid())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("purgeEntity(guid={}): REMOVED", entity.getGuid());
+ }
+
+ iter.remove();
+ }
+ }
+ }
+ }
+
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index e0668cb..f610f6f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -29,6 +29,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
@@ -466,6 +467,10 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
@Override
@GraphTransaction
public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding classifications={} to entity={}", classifications, guid);
+ }
+
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
@@ -474,17 +479,23 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding classifications={} to entity={}", classifications, guid);
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ if (entityVertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex);
for (AtlasClassification classification : classifications) {
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
"add classification: guid=", guid, ", classification=", classification.getTypeName());
}
+ EntityMutationContext context = new EntityMutationContext();
+
+ context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName()));
+
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
for (AtlasClassification classification : classifications) {
@@ -494,7 +505,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
// validate if entity, not already associated with classifications
validateEntityAssociations(guid, classifications);
- entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
+ entityGraphMapper.addClassifications(context, guid, classifications);
}
@Override
@@ -512,24 +523,38 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ if (entityVertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex);
for (AtlasClassification classification : classifications) {
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, classification), "update classification: guid=", guid, ", classification=", classification.getTypeName());
}
+ EntityMutationContext context = new EntityMutationContext();
+
+ context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName()));
+
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
for (AtlasClassification classification : classifications) {
validateAndNormalize(classification);
}
- entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications);
+ entityGraphMapper.updateClassifications(context, guid, classifications);
}
@Override
@GraphTransaction
public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding classification={} to entities={}", classification, guids);
+ }
+
if (CollectionUtils.isEmpty(guids)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
@@ -537,15 +562,21 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified");
}
+ EntityMutationContext context = new EntityMutationContext();
+
for (String guid : guids) {
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ if (entityVertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex);
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
"add classification: guid=", guid, ", classification=", classification.getTypeName());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding classification={} to entities={}", classification, guids);
+ context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName()));
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids);
@@ -557,7 +588,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
for (String guid : guids) {
validateEntityAssociations(guid, classifications);
- entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
+ entityGraphMapper.addClassifications(context, guid, classifications);
}
}
@@ -813,8 +844,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- String guidVertex = AtlasGraphUtilsV2.getIdFromVertex(vertex);
+ String guidVertex = AtlasGraphUtilsV2.getIdFromVertex(vertex);
if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
entity.setGuid(guidVertex);
@@ -828,7 +858,6 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
//Create vertices which do not exist in the repository
if (RequestContext.get().isImportInProgress() && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
@@ -851,6 +880,24 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
// during import, update the system attributes
if (RequestContext.get().isImportInProgress()) {
+ Status newStatus = entity.getStatus();
+
+ if (newStatus != null) {
+ Status currStatus = AtlasGraphUtilsV2.getState(vertex);
+
+ if (currStatus == Status.ACTIVE && newStatus == Status.DELETED) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("entity-delete via import - guid={}", guid);
+ }
+
+ context.addEntityToDelete(vertex);
+ } else if (currStatus == Status.DELETED && newStatus == Status.ACTIVE) {
+ LOG.warn("attempt to activate deleted entity (guid={}). Ignored", guid);
+
+ entity.setStatus(currStatus);
+ }
+ }
+
entityGraphMapper.updateSystemAttributes(vertex, entity);
}
}
@@ -895,11 +942,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
deleteDelegate.getHandler().deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities
- for (AtlasObjectId entity : req.getDeletedEntities()) {
+ for (AtlasEntityHeader entity : req.getDeletedEntities()) {
response.addEntity(DELETE, entity);
}
- for (AtlasObjectId entity : req.getUpdatedEntities()) {
+ for (AtlasEntityHeader entity : req.getUpdatedEntities()) {
response.addEntity(UPDATE, entity);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 82b563e..a260154 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -139,10 +139,6 @@ public class EntityGraphMapper {
}
public void updateSystemAttributes(AtlasVertex vertex, AtlasEntity entity) {
- if (entity.getStatus() != null) {
- AtlasGraphUtilsV2.setEncodedProperty(vertex, STATE_PROPERTY_KEY, entity.getStatus().name());
- }
-
if (entity.getVersion() != null) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, VERSION_PROPERTY_KEY, entity.getVersion());
}
@@ -226,13 +222,17 @@ public class EntityGraphMapper {
}
}
+ if (CollectionUtils.isNotEmpty(context.getEntitiesToDelete())) {
+ deleteDelegate.getHandler().deleteEntities(context.getEntitiesToDelete());
+ }
+
RequestContext req = RequestContext.get();
- for (AtlasObjectId entity : req.getDeletedEntities()) {
+ for (AtlasEntityHeader entity : req.getDeletedEntities()) {
resp.addEntity(DELETE, entity);
}
- for (AtlasObjectId entity : req.getUpdatedEntities()) {
+ for (AtlasEntityHeader entity : req.getUpdatedEntities()) {
if (isPartialUpdate) {
resp.addEntity(PARTIAL_UPDATE, entity);
}
@@ -395,7 +395,7 @@ public class EntityGraphMapper {
switch (ctx.getAttrType().getTypeCategory()) {
case PRIMITIVE:
case ENUM:
- return mapPrimitiveValue(ctx);
+ return mapPrimitiveValue(ctx, context);
case STRUCT: {
String edgeLabel = AtlasGraphUtilsV2.getEdgeLabel(ctx.getVertexProperty());
@@ -577,7 +577,7 @@ public class EntityGraphMapper {
if (!requestContext.isDeletedEntity(GraphHelper.getGuid(inverseVertex))) {
updateModificationMetadata(inverseVertex);
- requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(inverseVertex));
+ requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(inverseVertex));
}
}
}
@@ -664,7 +664,7 @@ public class EntityGraphMapper {
return ret;
}
- private Object mapPrimitiveValue(AttributeMutationContext ctx) {
+ private Object mapPrimitiveValue(AttributeMutationContext ctx, EntityMutationContext context) {
boolean isIndexableStrAttr = ctx.getAttributeDef().getIsIndexable() && ctx.getAttrType() instanceof AtlasBuiltInTypes.AtlasStringType;
Object ret = ctx.getValue();
@@ -708,7 +708,7 @@ public class EntityGraphMapper {
String uniqPropName = ctx.getAttribute() != null ? ctx.getAttribute().getVertexUniquePropertyName() : null;
if (uniqPropName != null) {
- if (AtlasGraphUtilsV2.getState(ctx.getReferringVertex()) == DELETED) {
+ if (context.isDeletedEntity(ctx.getReferringVertex()) || AtlasGraphUtilsV2.getState(ctx.getReferringVertex()) == DELETED) {
ctx.getReferringVertex().removeProperty(uniqPropName);
} else {
AtlasGraphUtilsV2.setEncodedProperty(ctx.getReferringVertex(), uniqPropName, ret);
@@ -1355,6 +1355,7 @@ public class EntityGraphMapper {
AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
header.setGuid(getIdFromVertex(vertex));
+ header.setStatus(entity.getStatus());
for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName()));
@@ -1363,10 +1364,6 @@ public class EntityGraphMapper {
return header;
}
- public static AtlasEntityHeader constructHeader(AtlasObjectId id) {
- return new AtlasEntityHeader(id.getTypeName(), id.getGuid(), id.getUniqueAttributes());
- }
-
private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, AtlasMapType mapType, Object val) {
if (mapType.getValueType().getTypeCategory() == TypeCategory.OBJECT_ID_TYPE && !ctx.getAttributeDef().isSoftReferenced()) {
AtlasEdge edge = (AtlasEdge) val;
@@ -1382,14 +1379,8 @@ public class EntityGraphMapper {
public void addClassifications(final EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
- AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
-
- if (entityVertex == null) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
- }
-
- final String entityTypeName = AtlasGraphUtilsV2.getTypeName(entityVertex);
- final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
+ final AtlasVertex entityVertex = context.getVertex(guid);
+ final AtlasEntityType entityType = context.getType(guid);
List<AtlasVertex> entitiesToPropagateTo = null;
Map<AtlasVertex, List<AtlasClassification>> propagations = null;
List<AtlasClassification> addClassifications = new ArrayList<>(classifications.size());
@@ -1434,7 +1425,7 @@ public class EntityGraphMapper {
// ignore propagated classifications
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding classification [{}] to [{}] using edge label: [{}]", classificationName, entityTypeName, getTraitLabel(classificationName));
+ LOG.debug("Adding classification [{}] to [{}] using edge label: [{}]", classificationName, entityType.getTypeName(), getTraitLabel(classificationName));
}
AtlasGraphUtilsV2.addEncodedProperty(entityVertex, TRAIT_NAMES_PROPERTY_KEY, classificationName);
@@ -1466,7 +1457,7 @@ public class EntityGraphMapper {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(entitiesToPropagateTo));
+ LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityType.getTypeName(), getTypeNames(entitiesToPropagateTo));
}
List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
@@ -1478,12 +1469,12 @@ public class EntityGraphMapper {
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityTypeName);
+ LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityType.getTypeName());
}
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(" --> Not propagating classification: [{}][{}] - propagation is disabled.", getTypeName(classificationVertex), entityTypeName);
+ LOG.debug(" --> Not propagating classification: [{}][{}] - propagation is disabled.", getTypeName(classificationVertex), entityType.getTypeName());
}
}
@@ -1867,7 +1858,7 @@ public class EntityGraphMapper {
if (!req.isUpdatedEntity(GraphHelper.getGuid(vertex))) {
updateModificationMetadata(vertex);
- req.recordEntityUpdate(entityRetriever.toAtlasObjectId(vertex));
+ req.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(vertex));
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index e288cdf..e2b7433 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -209,46 +209,6 @@ public class EntityGraphRetriever {
return ret;
}
- public AtlasEntityHeader toAtlasEntityHeader(AtlasEntity entity) {
- AtlasEntityHeader ret = null;
- String typeName = entity.getTypeName();
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-
- if (entityType != null) {
- Map<String, Object> uniqueAttributes = new HashMap<>();
-
- for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
- Object attrValue = entity.getAttribute(attribute.getName());
-
- if (attrValue != null) {
- uniqueAttributes.put(attribute.getName(), attrValue);
- }
- }
-
- ret = new AtlasEntityHeader(entity.getTypeName(), entity.getGuid(), uniqueAttributes);
-
- if (CollectionUtils.isNotEmpty(entity.getClassifications())) {
- List<AtlasClassification> classifications = new ArrayList<>(entity.getClassifications().size());
- List<String> classificationNames = new ArrayList<>(entity.getClassifications().size());
-
- for (AtlasClassification classification : entity.getClassifications()) {
- classifications.add(classification);
- classificationNames.add(classification.getTypeName());
- }
-
- ret.setClassifications(classifications);
- ret.setClassificationNames(classificationNames);
- }
-
- if (CollectionUtils.isNotEmpty(entity.getMeanings())) {
- ret.setMeanings(entity.getMeanings());
- ret.setMeaningNames(entity.getMeanings().stream().map(AtlasTermAssignmentHeader::getDisplayText).collect(Collectors.toList()));
- }
- }
-
- return ret;
- }
-
public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseException {
AtlasObjectId ret = null;
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java
index 453dbe6..deb743e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java
@@ -27,18 +27,20 @@ import org.apache.commons.lang.StringUtils;
import java.util.*;
public class EntityMutationContext {
- private EntityGraphDiscoveryContext context = null;
- private final List<AtlasEntity> entitiesCreated = new ArrayList<>();
- private final List<AtlasEntity> entitiesUpdated = new ArrayList<>();
- private final Map<String, AtlasEntityType> entityVsType = new HashMap<>();
- private final Map<String, AtlasVertex> entityVsVertex = new HashMap<>();
- private final Map<String, String> guidAssignments = new HashMap<>();
+ private final EntityGraphDiscoveryContext context;
+ private final List<AtlasEntity> entitiesCreated = new ArrayList<>();
+ private final List<AtlasEntity> entitiesUpdated = new ArrayList<>();
+ private final Map<String, AtlasEntityType> entityVsType = new HashMap<>();
+ private final Map<String, AtlasVertex> entityVsVertex = new HashMap<>();
+ private final Map<String, String> guidAssignments = new HashMap<>();
+ private List<AtlasVertex> entitiesToDelete = null;
public EntityMutationContext(final EntityGraphDiscoveryContext context) {
this.context = context;
}
public EntityMutationContext() {
+ this.context = null;
}
public void addCreated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
@@ -65,6 +67,19 @@ public class EntityMutationContext {
}
}
+ public void addEntityToDelete(AtlasVertex vertex) {
+ if (entitiesToDelete == null) {
+ entitiesToDelete = new ArrayList<>();
+ }
+
+ entitiesToDelete.add(vertex);
+ }
+
+ public void cacheEntity(String guid, AtlasVertex vertex, AtlasEntityType entityType) {
+ entityVsType.put(guid, entityType);
+ entityVsVertex.put(guid, vertex);
+ }
+
public EntityGraphDiscoveryContext getDiscoveryContext() {
return this.context;
}
@@ -81,6 +96,10 @@ public class EntityMutationContext {
return guidAssignments;
}
+ public List<AtlasVertex> getEntitiesToDelete() {
+ return entitiesToDelete;
+ }
+
public AtlasEntityType getType(String guid) {
return entityVsType.get(guid);
}
@@ -130,6 +149,10 @@ public class EntityMutationContext {
return getFromCollection(parentGuid, getUpdatedEntities());
}
+ public boolean isDeletedEntity(AtlasVertex vertex) {
+ return entitiesToDelete != null && entitiesToDelete.contains(vertex);
+ }
+
private AtlasEntity getFromCollection(String parentGuid, Collection<AtlasEntity> coll) {
for (AtlasEntity e : coll) {
if(e.getGuid().equalsIgnoreCase(parentGuid)) {
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 29f2974..f127424 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -21,7 +21,7 @@ package org.apache.atlas;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.store.DeleteType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
@@ -39,8 +39,8 @@ public class RequestContext {
private static final boolean isMetricsEnabled = METRICS.isDebugEnabled();
private final long requestTime = System.currentTimeMillis();
- private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>();
- private final Map<String, AtlasObjectId> deletedEntities = new HashMap<>();
+ private final Map<String, AtlasEntityHeader> updatedEntities = new HashMap<>();
+ private final Map<String, AtlasEntityHeader> deletedEntities = new HashMap<>();
private final Map<String, AtlasEntity> entityCache = new HashMap<>();
private final Map<String, AtlasEntityWithExtInfo> entityExtInfoCache = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
@@ -164,13 +164,13 @@ public class RequestContext {
isImportInProgress = importInProgress;
}
- public void recordEntityUpdate(AtlasObjectId entity) {
+ public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
updatedEntities.put(entity.getGuid(), entity);
}
}
- public void recordEntityDelete(AtlasObjectId entity) {
+ public void recordEntityDelete(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
deletedEntities.put(entity.getGuid(), entity);
}
@@ -248,11 +248,11 @@ public class RequestContext {
}
- public Collection<AtlasObjectId> getUpdatedEntities() {
+ public Collection<AtlasEntityHeader> getUpdatedEntities() {
return updatedEntities.values();
}
- public Collection<AtlasObjectId> getDeletedEntities() {
+ public Collection<AtlasEntityHeader> getDeletedEntities() {
return deletedEntities.values();
}
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
index 9cc5498..7e6a380 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
@@ -60,7 +60,7 @@ public class AdminExportImportTestIT extends BaseResourceIT {
@Test(dependsOnMethods = "isActive")
public void importData() throws AtlasServiceException {
- performImport(FILE_TO_IMPORT);
+ performImport(FILE_TO_IMPORT, 37);
assertReplicationData("cl1");
}
@@ -84,21 +84,21 @@ public class AdminExportImportTestIT extends BaseResourceIT {
assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE);
}
- private void performImport(String fileToImport) throws AtlasServiceException {
+ private void performImport(String fileToImport, int expectedProcessedEntitiesCount) throws AtlasServiceException {
AtlasImportRequest request = new AtlasImportRequest();
request.getOptions().put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, SOURCE_SERVER_NAME);
request.getOptions().put(AtlasImportRequest.TRANSFORMS_KEY, IMPORT_TRANSFORM_CLEAR_ATTRS);
- performImport(fileToImport, request);
+ performImport(fileToImport, request, expectedProcessedEntitiesCount);
}
- private void performImport(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
+ private void performImport(String fileToImport, AtlasImportRequest request, int expectedProcessedEntitiesCount) throws AtlasServiceException {
AtlasImportResult result = performImportUsing(fileToImport, request);
assertNotNull(result);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
assertNotNull(result.getMetrics());
- assertEquals(result.getProcessedEntities().size(), 37);
+ assertEquals(result.getProcessedEntities().size(), expectedProcessedEntitiesCount, "processedEntities: expected=" + expectedProcessedEntitiesCount + ", found=" + result.getProcessedEntities().size() + ". result=" + result);
}
private AtlasImportResult performImportUsing(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
@@ -126,7 +126,7 @@ public class AdminExportImportTestIT extends BaseResourceIT {
request.getOptions().put(AtlasImportRequest.TRANSFORMS_KEY, IMPORT_TRANSFORM_SET_DELETED);
try {
- performImport(FILE_TO_IMPORT, request);
+ performImport(FILE_TO_IMPORT, request, 32); // initial import has 5 entities already in deleted state, hence current import will have 32 processed-entities
} catch (AtlasServiceException e) {
throw new SkipException("performTeardown: failed! Subsequent tests results may be affected.");
}