You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/02/22 06:40:20 UTC
[atlas] branch branch-0.8 updated: ATLAS-3405: Handling of
references to non-existing entities in notifications
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 9cc59d4 ATLAS-3405: Handling of references to non-existing entities in notifications
9cc59d4 is described below
commit 9cc59d449d63237bc33a2263eb8ed1e5eae52dd9
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Fri Feb 21 14:36:45 2020 -0800
ATLAS-3405: Handling of references to non-existing entities in notifications
---
.../java/org/apache/atlas/AtlasConfiguration.java | 3 ++
.../org/apache/atlas/repository/Constants.java | 3 ++
.../apache/atlas/model/instance/AtlasEntity.java | 29 ++++++++----
.../atlas/model/instance/AtlasEntityHeader.java | 12 ++++-
.../repository/graph/GraphBackedSearchIndexer.java | 3 ++
.../apache/atlas/repository/graph/GraphHelper.java | 7 +++
.../graph/v1/AtlasEntityGraphDiscoveryV1.java | 12 +++--
.../store/graph/v1/AtlasEntityStoreV1.java | 11 ++++-
.../store/graph/v1/EntityGraphMapper.java | 55 ++++++++++++++++++++--
.../store/graph/v1/EntityGraphRetriever.java | 2 +
.../store/graph/v1/IDBasedEntityResolver.java | 6 ++-
.../graph/v1/UniqAttrBasedEntityResolver.java | 11 ++++-
.../java/org/apache/atlas/RequestContextV1.java | 9 ++++
.../notification/NotificationHookConsumer.java | 5 ++
.../org/apache/atlas/web/filters/AuditFilter.java | 8 +++-
15 files changed, 151 insertions(+), 25 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 17d67cb..701ad17 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -43,6 +43,9 @@ public enum AtlasConfiguration {
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
+ NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref", true),
+ REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref", false),
+
GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH("atlas.graphstore.indexed.string.safe.length", Short.MAX_VALUE), // based on org.apache.hadoop.hbase.client.Mutation.checkRow()
//search configuration
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 311c408..4fa6dad 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -70,6 +70,7 @@ public final class Constants {
public static final String MODIFIED_BY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modifiedBy");
public static final String TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "timestamp");
public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
+ public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
/**
* search backing index name.
@@ -97,6 +98,8 @@ public final class Constants {
public static final String MAX_FULLTEXT_QUERY_STR_LENGTH = "atlas.graph.fulltext-max-query-str-length";
public static final String MAX_DSL_QUERY_STR_LENGTH = "atlas.graph.dsl-max-query-str-length";
+ public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
+
/*
* replication attributes
*/
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 35b5750..b0a971c 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -57,18 +57,21 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
public class AtlasEntity extends AtlasStruct implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final String KEY_IS_INCOMPLETE = "isIncomplete";
+
/**
* Status of the entity - can be active or deleted. Deleted entities are not removed from Atlas store.
*/
public enum Status { ACTIVE, DELETED }
- private String guid = null;
- private Status status = Status.ACTIVE;
- private String createdBy = null;
- private String updatedBy = null;
- private Date createTime = null;
- private Date updateTime = null;
- private Long version = 0L;
+ private String guid = null;
+ private Boolean isIncomplete = Boolean.FALSE;
+ private Status status = Status.ACTIVE;
+ private String createdBy = null;
+ private String updatedBy = null;
+ private Date createTime = null;
+ private Date updateTime = null;
+ private Long version = 0L;
private List<AtlasClassification> classifications;
@@ -112,6 +115,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
if (other != null) {
setGuid(other.getGuid());
+ setIsIncomplete(other.getIsIncomplete());
setStatus(other.getStatus());
setCreatedBy(other.getCreatedBy());
setUpdatedBy(other.getUpdatedBy());
@@ -130,6 +134,12 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
this.guid = guid;
}
+ public Boolean getIsIncomplete() { return isIncomplete; }
+
+ public void setIsIncomplete(Boolean isIncomplete) {
+ this.isIncomplete = isIncomplete;
+ }
+
public Status getStatus() {
return status;
}
@@ -185,6 +195,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
private void init() {
setGuid(nextInternalId());
+ setIsIncomplete(Boolean.FALSE);
setStatus(null);
setCreatedBy(null);
setUpdatedBy(null);
@@ -206,6 +217,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
sb.append("AtlasEntity{");
super.toString(sb);
sb.append("guid='").append(guid).append('\'');
+ sb.append(", isIncomplete=").append(isIncomplete);
sb.append(", status=").append(status);
sb.append(", createdBy='").append(createdBy).append('\'');
sb.append(", updatedBy='").append(updatedBy).append('\'');
@@ -229,6 +241,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
AtlasEntity that = (AtlasEntity) o;
return Objects.equals(guid, that.guid) &&
+ Objects.equals(isIncomplete, that.isIncomplete) &&
status == that.status &&
Objects.equals(createdBy, that.createdBy) &&
Objects.equals(updatedBy, that.updatedBy) &&
@@ -240,7 +253,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), guid, status, createdBy, updatedBy, createTime, updateTime, version,
+ return Objects.hash(super.hashCode(), guid, isIncomplete, status, createdBy, updatedBy, createTime, updateTime, version,
classifications);
}
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
index da8b445..accd0b9 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
@@ -53,6 +53,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
private static final long serialVersionUID = 1L;
private String guid = null;
+ private Boolean isIncomplete = Boolean.FALSE;
private AtlasEntity.Status status = AtlasEntity.Status.ACTIVE;
private String displayText = null;
private List<String> classificationNames = null;
@@ -107,6 +108,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
if (other != null) {
setGuid(other.getGuid());
+ setIsIncomplete(other.getIsIncomplete());
setStatus(other.getStatus());
setDisplayText(other.getDisplayText());
setClassificationNames(other.getClassificationNames());
@@ -122,6 +124,12 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
this.guid = guid;
}
+ public Boolean getIsIncomplete() { return isIncomplete; }
+
+ public void setIsIncomplete(Boolean isIncomplete) {
+ this.isIncomplete = isIncomplete;
+ }
+
public AtlasEntity.Status getStatus() {
return status;
}
@@ -158,6 +166,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
sb.append("AtlasEntityHeader{");
sb.append("guid='").append(guid).append('\'');
+ sb.append(", isIncomplete=").append(isIncomplete);
sb.append(", status=").append(status);
sb.append(", displayText=").append(displayText);
sb.append(", classificationNames=[");
@@ -179,6 +188,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
if (!super.equals(o)) return false;
AtlasEntityHeader that = (AtlasEntityHeader) o;
return Objects.equals(guid, that.guid) &&
+ Objects.equals(isIncomplete, that.isIncomplete) &&
status == that.status &&
Objects.equals(displayText, that.displayText) &&
Objects.equals(classificationNames, that.classificationNames) &&
@@ -187,7 +197,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), guid, status, displayText, classificationNames, classifications);
+ return Objects.hash(super.hashCode(), guid, isIncomplete, status, displayText, classificationNames, classifications);
}
@Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 54f44af..22f39a8 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -153,6 +153,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
AtlasCardinality.SINGLE, true, true);
// Add creation_timestamp property to Vertex Index (mixed index)
+ createIndexes(management, Constants.IS_INCOMPLETE_PROPERTY_KEY, Integer.class, false, AtlasCardinality.SINGLE, true, true);
+
+ // Add creation_timestamp property to Vertex Index (mixed index)
createIndexes(management, Constants.TIMESTAMP_PROPERTY_KEY, Long.class, false, AtlasCardinality.SINGLE, false, false);
// Add modification_timestamp property to Vertex Index (mixed index)
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 25cc70f..c520929 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -551,6 +551,13 @@ public final class GraphHelper {
return vertex.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class);
}
+ public static Boolean isEntityIncomplete(AtlasElement element) {
+ Integer value = element.getProperty(Constants.IS_INCOMPLETE_PROPERTY_KEY, Integer.class);
+ Boolean ret = value != null && value.equals(Constants.INCOMPLETE_ENTITY_VALUE) ? Boolean.TRUE : Boolean.FALSE;
+
+ return ret;
+ }
+
public static String getTypeName(AtlasVertex instanceVertex) {
return instanceVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 66a92fa..e41a66c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -54,10 +54,12 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphDiscoveryContext discoveryContext;
+ private final EntityGraphMapper entityGraphMapper;
- public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) {
- this.typeRegistry = typeRegistry;
- this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
+ public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream, EntityGraphMapper entityGraphMapper) {
+ this.typeRegistry = typeRegistry;
+ this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
+ this.entityGraphMapper = entityGraphMapper;
}
@Override
@@ -173,8 +175,8 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
protected void resolveReferences() throws AtlasBaseException {
MetricRecorder metric = RequestContextV1.get().startMetricRecord("resolveReferences");
- EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
- new UniqAttrBasedEntityResolver(typeRegistry)
+ EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry, entityGraphMapper),
+ new UniqAttrBasedEntityResolver(typeRegistry, entityGraphMapper)
};
for (EntityResolver resolver : entityResolvers) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 75483aa..9fa1d96 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -60,6 +60,8 @@ import java.util.Objects;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
@Component
@@ -723,7 +725,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
MetricRecorder metric = RequestContextV1.get().startMetricRecord("preCreateOrUpdate");
- EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
+ EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream, entityGraphMapper);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
RequestContextV1 requestContext = RequestContextV1.get();
@@ -739,6 +741,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
// entity would be null if guid is not in the stream but referenced by an entity in the stream
if (!isPartialUpdate) {
graphDiscoverer.validateAndNormalize(entity);
+
+ // change entity 'isInComplete' to 'false' during full update
+ if (isEntityIncomplete(vertex)) {
+ vertex.removeProperty(IS_INCOMPLETE_PROPERTY_KEY);
+
+ entity.setIsIncomplete(Boolean.FALSE);
+ }
} else {
graphDiscoverer.validateAndNormalizeForUpdate(entity);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index f34506a..c6c6ae5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -42,6 +42,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
import org.apache.atlas.type.AtlasClassificationType;
@@ -99,9 +100,45 @@ public class EntityGraphMapper {
return createVertexWithGuid(entity, guid);
}
+ public AtlasVertex createShellEntityVertex(AtlasObjectId objectId, EntityGraphDiscoveryContext context) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> createShellEntityVertex({})", objectId.getTypeName());
+ }
+
+ final String guid = UUID.randomUUID().toString();
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName());
+ AtlasVertex ret = createStructVertex(objectId);
+
+ for (String superTypeName : entityType.getAllSuperTypes()) {
+ AtlasGraphUtilsV1.addEncodedProperty(ret, SUPER_TYPES_PROPERTY_KEY, superTypeName);
+ }
+
+ AtlasGraphUtilsV1.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid);
+ AtlasGraphUtilsV1.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getEntityVersion(null));
+ AtlasGraphUtilsV1.setEncodedProperty(ret, IS_INCOMPLETE_PROPERTY_KEY, INCOMPLETE_ENTITY_VALUE);
+
+ // map unique attributes
+ Map<String, Object> uniqueAttributes = objectId.getUniqueAttributes();
+ EntityMutationContext mutationContext = new EntityMutationContext(context);
+
+ for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
+ String attrName = attribute.getName();
+
+ if (uniqueAttributes.containsKey(attrName)) {
+ Object attrValue = attribute.getAttributeType().getNormalizedValue(uniqueAttributes.get(attrName));
+
+ mapAttribute(attribute, attrValue, ret, CREATE, mutationContext);
+ }
+ }
+
+ LOG.info("created shell vertex for entity: objectId={}, guid={}", objectId, guid);
+
+ return ret;
+ }
+
public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> createVertex({})", entity.getTypeName());
+ LOG.debug("==> createVertexWithGuid({})", entity.getTypeName());
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
@@ -203,13 +240,21 @@ public class EntityGraphMapper {
}
private AtlasVertex createStructVertex(AtlasStruct struct) {
+ return createStructVertex(struct.getTypeName());
+ }
+
+ private AtlasVertex createStructVertex(AtlasObjectId objectId) {
+ return createStructVertex(objectId.getTypeName());
+ }
+
+ private AtlasVertex createStructVertex(String typeName) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> createStructVertex({})", struct.getTypeName());
+ LOG.debug("==> createStructVertex({})", typeName);
}
final AtlasVertex ret = graph.addVertex();
- AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, struct.getTypeName());
+ AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
@@ -217,7 +262,7 @@ public class EntityGraphMapper {
AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
if (LOG.isDebugEnabled()) {
- LOG.debug("<== createStructVertex({})", struct.getTypeName());
+ LOG.debug("<== createStructVertex({})",typeName);
}
return ret;
@@ -971,6 +1016,8 @@ public class EntityGraphMapper {
AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
header.setGuid(AtlasGraphUtilsV1.getIdFromVertex(vertex));
+ header.setIsIncomplete(GraphHelper.isEntityIncomplete(vertex));
+ header.setStatus(GraphHelper.getStatus(vertex));
header.setStatus(entity.getStatus());
for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index d364556..492953f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -293,6 +293,7 @@ public final class EntityGraphRetriever {
ret.setTypeName(typeName);
ret.setGuid(guid);
+ ret.setIsIncomplete(GraphHelper.isEntityIncomplete(entityVertex));
ret.setStatus(GraphHelper.getStatus(entityVertex));
ret.setClassificationNames(GraphHelper.getTraitNames(entityVertex));
@@ -350,6 +351,7 @@ public final class EntityGraphRetriever {
}
entity.setGuid(GraphHelper.getGuid(entityVertex));
+ entity.setIsIncomplete(GraphHelper.isEntityIncomplete(entityVertex));
entity.setTypeName(GraphHelper.getTypeName(entityVertex));
entity.setStatus(GraphHelper.getStatus(entityVertex));
entity.setVersion(GraphHelper.getVersion(entityVertex).longValue());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
index 8b7ac8b..5ed68b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
@@ -35,9 +35,11 @@ public class IDBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class);
private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphMapper entityGraphMapper;
- public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
- this.typeRegistry = typeRegistry;
+ public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
+ this.typeRegistry = typeRegistry;
+ this.entityGraphMapper = entityGraphMapper;
}
public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryContext context) throws AtlasBaseException {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java
index 50eee72..52accec 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasObjectId;
@@ -37,9 +38,11 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class);
private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphMapper entityGraphMapper;
- public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
- this.typeRegistry = typeRegistry;
+ public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
+ this.typeRegistry = typeRegistry;
+ this.entityGraphMapper = entityGraphMapper;
}
@Override
@@ -61,6 +64,10 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
AtlasVertex vertex = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, objId.getUniqueAttributes());
+ if (vertex == null && RequestContextV1.get().isCreateShellEntityForNonExistingReference()) {
+ vertex = entityGraphMapper.createShellEntityVertex(objId, context);
+ }
+
if (vertex != null) {
context.addResolvedIdByUniqAttribs(objId, vertex);
resolvedReferences.add(objId);
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
index f74dc5d..1d84123 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
@@ -48,6 +48,7 @@ public class RequestContextV1 {
private List<EntityGuidPair> entityGuidInRequest = null;
private String user;
+ private boolean createShellEntityForNonExistingReference = false;
private DeleteType deleteType = DeleteType.DEFAULT;
private RequestContextV1() {
@@ -131,6 +132,14 @@ public class RequestContextV1 {
this.user = user;
}
+ public boolean isCreateShellEntityForNonExistingReference() {
+ return createShellEntityForNonExistingReference;
+ }
+
+ public void setCreateShellEntityForNonExistingReference(boolean createShellEntityForNonExistingReference) {
+ this.createShellEntityForNonExistingReference = createShellEntityForNonExistingReference;
+ }
+
public DeleteType getDeleteType() { return deleteType; }
public void setDeleteType(DeleteType deleteType) { this.deleteType = (deleteType == null) ? DeleteType.DEFAULT : deleteType; }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 836e6f8..c60ef4d 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -158,6 +158,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<String> hiveTablePrefixesToIgnore;
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean preprocessEnabled;
+ private final boolean createShellEntityForNonExistingReference;
@VisibleForTesting
final int consumerRetryInterval;
@@ -274,6 +275,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
+
+ createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
}
@Override
@@ -581,6 +584,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
requestContext.setMaxAttempts(maxRetries);
requestContext.setUser(messageUser);
+ RequestContextV1.get().setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
+
switch (message.getType()) {
case ENTITY_CREATE: {
final EntityCreateRequest createRequest = (EntityCreateRequest) message;
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index c503e41..3fd27c7 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -45,6 +45,7 @@ import java.io.IOException;
import java.util.Date;
import java.util.UUID;
+import static org.apache.atlas.AtlasConfiguration.REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF;
import static org.apache.atlas.AtlasConfiguration.REST_API_ENABLE_DELETE_TYPE_OVERRIDE;
/**
@@ -57,13 +58,15 @@ public class AuditFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(AuditFilter.class);
private static final Logger METRICS_LOG = LoggerFactory.getLogger("METRICS");
- private boolean deleteTypeOverrideEnabled = false;
+ private boolean deleteTypeOverrideEnabled = false;
+ private boolean createShellEntityForNonExistingReference = false;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
LOG.info("AuditFilter initialization started");
- deleteTypeOverrideEnabled = REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+ deleteTypeOverrideEnabled = REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+ createShellEntityForNonExistingReference = REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
LOG.info("REST_API_ENABLE_DELETE_TYPE_OVERRIDE={}", deleteTypeOverrideEnabled);
}
@@ -87,6 +90,7 @@ public class AuditFilter implements Filter {
RequestContextV1.clear();
RequestContextV1 requestContextV1 = RequestContextV1.get();
requestContextV1.setUser(user);
+ requestContextV1.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
if (StringUtils.isNotEmpty(deleteType)) {
if (deleteTypeOverrideEnabled) {