You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/02/22 06:40:20 UTC

[atlas] branch branch-0.8 updated: ATLAS-3405: Handling of references to non-existing entities in notifications

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 9cc59d4  ATLAS-3405: Handling of references to non-existing entities in notifications
9cc59d4 is described below

commit 9cc59d449d63237bc33a2263eb8ed1e5eae52dd9
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Fri Feb 21 14:36:45 2020 -0800

    ATLAS-3405: Handling of references to non-existing entities in notifications
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |  3 ++
 .../org/apache/atlas/repository/Constants.java     |  3 ++
 .../apache/atlas/model/instance/AtlasEntity.java   | 29 ++++++++----
 .../atlas/model/instance/AtlasEntityHeader.java    | 12 ++++-
 .../repository/graph/GraphBackedSearchIndexer.java |  3 ++
 .../apache/atlas/repository/graph/GraphHelper.java |  7 +++
 .../graph/v1/AtlasEntityGraphDiscoveryV1.java      | 12 +++--
 .../store/graph/v1/AtlasEntityStoreV1.java         | 11 ++++-
 .../store/graph/v1/EntityGraphMapper.java          | 55 ++++++++++++++++++++--
 .../store/graph/v1/EntityGraphRetriever.java       |  2 +
 .../store/graph/v1/IDBasedEntityResolver.java      |  6 ++-
 .../graph/v1/UniqAttrBasedEntityResolver.java      | 11 ++++-
 .../java/org/apache/atlas/RequestContextV1.java    |  9 ++++
 .../notification/NotificationHookConsumer.java     |  5 ++
 .../org/apache/atlas/web/filters/AuditFilter.java  |  8 +++-
 15 files changed, 151 insertions(+), 25 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 17d67cb..701ad17 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -43,6 +43,9 @@ public enum AtlasConfiguration {
     NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
     NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
 
+    NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref", true),
+    REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref", false),
+
     GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH("atlas.graphstore.indexed.string.safe.length", Short.MAX_VALUE),  // based on org.apache.hadoop.hbase.client.Mutation.checkRow()
 
     //search configuration
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 311c408..4fa6dad 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -70,6 +70,7 @@ public final class Constants {
     public static final String MODIFIED_BY_KEY                     = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modifiedBy");
     public static final String TIMESTAMP_PROPERTY_KEY              = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "timestamp");
     public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
+    public static final String IS_INCOMPLETE_PROPERTY_KEY          = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
 
     /**
      * search backing index name.
@@ -97,6 +98,8 @@ public final class Constants {
     public static final String MAX_FULLTEXT_QUERY_STR_LENGTH  = "atlas.graph.fulltext-max-query-str-length";
     public static final String MAX_DSL_QUERY_STR_LENGTH  = "atlas.graph.dsl-max-query-str-length";
 
+    public static final Integer INCOMPLETE_ENTITY_VALUE   = Integer.valueOf(1);
+
     /*
      * replication attributes
      */
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 35b5750..b0a971c 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -57,18 +57,21 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
 public class AtlasEntity extends AtlasStruct implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    public static final String KEY_IS_INCOMPLETE   = "isIncomplete";
+
     /**
      * Status of the entity - can be active or deleted. Deleted entities are not removed from Atlas store.
      */
     public enum Status { ACTIVE, DELETED }
 
-    private String guid       = null;
-    private Status status     = Status.ACTIVE;
-    private String createdBy  = null;
-    private String updatedBy  = null;
-    private Date   createTime = null;
-    private Date   updateTime = null;
-    private Long   version    = 0L;
+    private String  guid         = null;
+    private Boolean isIncomplete = Boolean.FALSE;
+    private Status  status       = Status.ACTIVE;
+    private String  createdBy    = null;
+    private String  updatedBy    = null;
+    private Date    createTime   = null;
+    private Date    updateTime   = null;
+    private Long    version      = 0L;
 
     private List<AtlasClassification> classifications;
 
@@ -112,6 +115,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
 
         if (other != null) {
             setGuid(other.getGuid());
+            setIsIncomplete(other.getIsIncomplete());
             setStatus(other.getStatus());
             setCreatedBy(other.getCreatedBy());
             setUpdatedBy(other.getUpdatedBy());
@@ -130,6 +134,12 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
         this.guid = guid;
     }
 
+    public Boolean getIsIncomplete() { return isIncomplete; }
+
+    public void setIsIncomplete(Boolean isIncomplete) {
+        this.isIncomplete = isIncomplete;
+    }
+
     public Status getStatus() {
         return status;
     }
@@ -185,6 +195,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
 
     private void init() {
         setGuid(nextInternalId());
+        setIsIncomplete(Boolean.FALSE);
         setStatus(null);
         setCreatedBy(null);
         setUpdatedBy(null);
@@ -206,6 +217,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
         sb.append("AtlasEntity{");
         super.toString(sb);
         sb.append("guid='").append(guid).append('\'');
+        sb.append(", isIncomplete=").append(isIncomplete);
         sb.append(", status=").append(status);
         sb.append(", createdBy='").append(createdBy).append('\'');
         sb.append(", updatedBy='").append(updatedBy).append('\'');
@@ -229,6 +241,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
 
         AtlasEntity that = (AtlasEntity) o;
         return Objects.equals(guid, that.guid) &&
+                Objects.equals(isIncomplete, that.isIncomplete) &&
                 status == that.status &&
                 Objects.equals(createdBy, that.createdBy) &&
                 Objects.equals(updatedBy, that.updatedBy) &&
@@ -240,7 +253,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), guid, status, createdBy, updatedBy, createTime, updateTime, version,
+        return Objects.hash(super.hashCode(), guid, isIncomplete, status, createdBy, updatedBy, createTime, updateTime, version,
                             classifications);
     }
 
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
index da8b445..accd0b9 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
@@ -53,6 +53,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private String                    guid                = null;
+    private Boolean                   isIncomplete        = Boolean.FALSE;
     private AtlasEntity.Status        status              = AtlasEntity.Status.ACTIVE;
     private String                    displayText         = null;
     private List<String>              classificationNames = null;
@@ -107,6 +108,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
 
         if (other != null) {
             setGuid(other.getGuid());
+            setIsIncomplete(other.getIsIncomplete());
             setStatus(other.getStatus());
             setDisplayText(other.getDisplayText());
             setClassificationNames(other.getClassificationNames());
@@ -122,6 +124,12 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
         this.guid = guid;
     }
 
+    public Boolean getIsIncomplete() { return isIncomplete; }
+
+    public void setIsIncomplete(Boolean isIncomplete) {
+        this.isIncomplete = isIncomplete;
+    }
+
     public AtlasEntity.Status getStatus() {
         return status;
     }
@@ -158,6 +166,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
 
         sb.append("AtlasEntityHeader{");
         sb.append("guid='").append(guid).append('\'');
+        sb.append(", isIncomplete=").append(isIncomplete);
         sb.append(", status=").append(status);
         sb.append(", displayText=").append(displayText);
         sb.append(", classificationNames=[");
@@ -179,6 +188,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
         if (!super.equals(o)) return false;
         AtlasEntityHeader that = (AtlasEntityHeader) o;
         return Objects.equals(guid, that.guid) &&
+                Objects.equals(isIncomplete, that.isIncomplete) &&
                 status == that.status &&
                 Objects.equals(displayText, that.displayText) &&
                 Objects.equals(classificationNames, that.classificationNames) &&
@@ -187,7 +197,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), guid, status, displayText, classificationNames, classifications);
+        return Objects.hash(super.hashCode(), guid, isIncomplete, status, displayText, classificationNames, classifications);
     }
 
     @Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 54f44af..22f39a8 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -153,6 +153,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
                     AtlasCardinality.SINGLE, true, true);
 
             // Add creation_timestamp property to Vertex Index (mixed index)
+            createIndexes(management, Constants.IS_INCOMPLETE_PROPERTY_KEY, Integer.class, false, AtlasCardinality.SINGLE, true, true);
+
+            // Add creation_timestamp property to Vertex Index (mixed index)
             createIndexes(management, Constants.TIMESTAMP_PROPERTY_KEY, Long.class, false, AtlasCardinality.SINGLE, false, false);
 
             // Add modification_timestamp property to Vertex Index (mixed index)
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 25cc70f..c520929 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -551,6 +551,13 @@ public final class GraphHelper {
         return vertex.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class);
     }
 
+    public static Boolean isEntityIncomplete(AtlasElement element) {
+        Integer value = element.getProperty(Constants.IS_INCOMPLETE_PROPERTY_KEY, Integer.class);
+        Boolean ret   = value != null && value.equals(Constants.INCOMPLETE_ENTITY_VALUE) ? Boolean.TRUE : Boolean.FALSE;
+
+        return ret;
+    }
+
     public static String getTypeName(AtlasVertex instanceVertex) {
         return instanceVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 66a92fa..e41a66c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -54,10 +54,12 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
 
     private final AtlasTypeRegistry           typeRegistry;
     private final EntityGraphDiscoveryContext discoveryContext;
+    private final EntityGraphMapper           entityGraphMapper;
 
-    public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) {
-        this.typeRegistry     = typeRegistry;
-        this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
+    public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream, EntityGraphMapper entityGraphMapper) {
+        this.typeRegistry      = typeRegistry;
+        this.discoveryContext  = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
+        this.entityGraphMapper = entityGraphMapper;
     }
 
     @Override
@@ -173,8 +175,8 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
     protected void resolveReferences() throws AtlasBaseException {
         MetricRecorder metric = RequestContextV1.get().startMetricRecord("resolveReferences");
 
-        EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
-                                                                  new UniqAttrBasedEntityResolver(typeRegistry)
+        EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry, entityGraphMapper),
+                                                                  new UniqAttrBasedEntityResolver(typeRegistry, entityGraphMapper)
                                                                 };
 
         for (EntityResolver resolver : entityResolvers) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 75483aa..9fa1d96 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -60,6 +60,8 @@ import java.util.Objects;
 
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
 
 
 @Component
@@ -723,7 +725,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
     private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
         MetricRecorder metric = RequestContextV1.get().startMetricRecord("preCreateOrUpdate");
 
-        EntityGraphDiscovery        graphDiscoverer  = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
+        EntityGraphDiscovery        graphDiscoverer  = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream, entityGraphMapper);
         EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
         EntityMutationContext       context          = new EntityMutationContext(discoveryContext);
         RequestContextV1            requestContext   = RequestContextV1.get();
@@ -739,6 +741,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
                     // entity would be null if guid is not in the stream but referenced by an entity in the stream
                     if (!isPartialUpdate) {
                         graphDiscoverer.validateAndNormalize(entity);
+
+                        // change entity 'isInComplete' to 'false' during full update
+                        if (isEntityIncomplete(vertex)) {
+                            vertex.removeProperty(IS_INCOMPLETE_PROPERTY_KEY);
+
+                            entity.setIsIncomplete(Boolean.FALSE);
+                        }
                     } else {
                         graphDiscoverer.validateAndNormalizeForUpdate(entity);
                     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index f34506a..c6c6ae5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -42,6 +42,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBuiltInTypes;
 import org.apache.atlas.type.AtlasClassificationType;
@@ -99,9 +100,45 @@ public class EntityGraphMapper {
         return createVertexWithGuid(entity, guid);
     }
 
+    public AtlasVertex createShellEntityVertex(AtlasObjectId objectId, EntityGraphDiscoveryContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createShellEntityVertex({})", objectId.getTypeName());
+        }
+
+        final String    guid       = UUID.randomUUID().toString();
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName());
+        AtlasVertex     ret        = createStructVertex(objectId);
+
+        for (String superTypeName : entityType.getAllSuperTypes()) {
+            AtlasGraphUtilsV1.addEncodedProperty(ret, SUPER_TYPES_PROPERTY_KEY, superTypeName);
+        }
+
+        AtlasGraphUtilsV1.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid);
+        AtlasGraphUtilsV1.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getEntityVersion(null));
+        AtlasGraphUtilsV1.setEncodedProperty(ret, IS_INCOMPLETE_PROPERTY_KEY, INCOMPLETE_ENTITY_VALUE);
+
+        // map unique attributes
+        Map<String, Object>   uniqueAttributes = objectId.getUniqueAttributes();
+        EntityMutationContext mutationContext  = new EntityMutationContext(context);
+
+        for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
+            String attrName  = attribute.getName();
+
+            if (uniqueAttributes.containsKey(attrName)) {
+                Object attrValue = attribute.getAttributeType().getNormalizedValue(uniqueAttributes.get(attrName));
+
+                mapAttribute(attribute, attrValue, ret, CREATE, mutationContext);
+            }
+        }
+
+        LOG.info("created shell vertex for entity: objectId={}, guid={}", objectId, guid);
+
+        return ret;
+    }
+
     public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> createVertex({})", entity.getTypeName());
+            LOG.debug("==> createVertexWithGuid({})", entity.getTypeName());
         }
 
         AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
@@ -203,13 +240,21 @@ public class EntityGraphMapper {
     }
 
     private AtlasVertex createStructVertex(AtlasStruct struct) {
+        return createStructVertex(struct.getTypeName());
+    }
+
+    private AtlasVertex createStructVertex(AtlasObjectId objectId) {
+        return createStructVertex(objectId.getTypeName());
+    }
+
+    private AtlasVertex createStructVertex(String typeName) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> createStructVertex({})", struct.getTypeName());
+            LOG.debug("==> createStructVertex({})", typeName);
         }
 
         final AtlasVertex ret = graph.addVertex();
 
-        AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, struct.getTypeName());
+        AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
         AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
         AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
         AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
@@ -217,7 +262,7 @@ public class EntityGraphMapper {
         AtlasGraphUtilsV1.setEncodedProperty(ret, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== createStructVertex({})", struct.getTypeName());
+            LOG.debug("<== createStructVertex({})",typeName);
         }
 
         return ret;
@@ -971,6 +1016,8 @@ public class EntityGraphMapper {
         AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
 
         header.setGuid(AtlasGraphUtilsV1.getIdFromVertex(vertex));
+		header.setIsIncomplete(GraphHelper.isEntityIncomplete(vertex));
+		header.setStatus(GraphHelper.getStatus(vertex));
         header.setStatus(entity.getStatus());
 
         for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index d364556..492953f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -293,6 +293,7 @@ public final class EntityGraphRetriever {
 
         ret.setTypeName(typeName);
         ret.setGuid(guid);
+        ret.setIsIncomplete(GraphHelper.isEntityIncomplete(entityVertex));
         ret.setStatus(GraphHelper.getStatus(entityVertex));
         ret.setClassificationNames(GraphHelper.getTraitNames(entityVertex));
 
@@ -350,6 +351,7 @@ public final class EntityGraphRetriever {
         }
 
         entity.setGuid(GraphHelper.getGuid(entityVertex));
+        entity.setIsIncomplete(GraphHelper.isEntityIncomplete(entityVertex));
         entity.setTypeName(GraphHelper.getTypeName(entityVertex));
         entity.setStatus(GraphHelper.getStatus(entityVertex));
         entity.setVersion(GraphHelper.getVersion(entityVertex).longValue());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
index 8b7ac8b..5ed68b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
@@ -35,9 +35,11 @@ public class IDBasedEntityResolver implements EntityResolver {
     private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class);
 
     private final AtlasTypeRegistry typeRegistry;
+    private final EntityGraphMapper entityGraphMapper;
 
-    public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
-        this.typeRegistry = typeRegistry;
+    public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
+        this.typeRegistry      = typeRegistry;
+        this.entityGraphMapper = entityGraphMapper;
     }
 
     public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryContext context) throws AtlasBaseException {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java
index 50eee72..52accec 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasObjectId;
@@ -37,9 +38,11 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
     private static final Logger LOG = LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class);
 
     private final AtlasTypeRegistry typeRegistry;
+    private final EntityGraphMapper entityGraphMapper;
 
-    public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
-        this.typeRegistry = typeRegistry;
+    public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
+        this.typeRegistry      = typeRegistry;
+        this.entityGraphMapper = entityGraphMapper;
     }
 
     @Override
@@ -61,6 +64,10 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
 
             AtlasVertex vertex = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, objId.getUniqueAttributes());
 
+            if (vertex == null && RequestContextV1.get().isCreateShellEntityForNonExistingReference()) {
+                vertex = entityGraphMapper.createShellEntityVertex(objId, context);
+            }
+
             if (vertex != null) {
                 context.addResolvedIdByUniqAttribs(objId, vertex);
                 resolvedReferences.add(objId);
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
index f74dc5d..1d84123 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
@@ -48,6 +48,7 @@ public class RequestContextV1 {
     private       List<EntityGuidPair>           entityGuidInRequest = null;
 
     private String     user;
+    private boolean    createShellEntityForNonExistingReference = false;
     private DeleteType deleteType = DeleteType.DEFAULT;
 
     private RequestContextV1() {
@@ -131,6 +132,14 @@ public class RequestContextV1 {
         this.user = user;
     }
 
+    public boolean isCreateShellEntityForNonExistingReference() {
+        return createShellEntityForNonExistingReference;
+    }
+
+    public void setCreateShellEntityForNonExistingReference(boolean createShellEntityForNonExistingReference) {
+        this.createShellEntityForNonExistingReference = createShellEntityForNonExistingReference;
+    }
+
     public DeleteType getDeleteType() { return deleteType; }
 
     public void setDeleteType(DeleteType deleteType) { this.deleteType = (deleteType == null) ? DeleteType.DEFAULT : deleteType; }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 836e6f8..c60ef4d 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -158,6 +158,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private final List<String>                  hiveTablePrefixesToIgnore;
     private final Map<String, PreprocessAction> hiveTablesCache;
     private final boolean                       preprocessEnabled;
+    private final boolean                       createShellEntityForNonExistingReference;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -274,6 +275,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
+
+        createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
     }
 
     @Override
@@ -581,6 +584,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         requestContext.setMaxAttempts(maxRetries);
                         requestContext.setUser(messageUser);
 
+                        RequestContextV1.get().setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
+
                         switch (message.getType()) {
                             case ENTITY_CREATE: {
                                 final EntityCreateRequest      createRequest = (EntityCreateRequest) message;
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index c503e41..3fd27c7 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -45,6 +45,7 @@ import java.io.IOException;
 import java.util.Date;
 import java.util.UUID;
 
+import static org.apache.atlas.AtlasConfiguration.REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF;
 import static org.apache.atlas.AtlasConfiguration.REST_API_ENABLE_DELETE_TYPE_OVERRIDE;
 
 /**
@@ -57,13 +58,15 @@ public class AuditFilter implements Filter {
     private static final Logger LOG = LoggerFactory.getLogger(AuditFilter.class);
     private static final Logger METRICS_LOG = LoggerFactory.getLogger("METRICS");
 
-    private boolean deleteTypeOverrideEnabled = false;
+    private boolean deleteTypeOverrideEnabled                = false;
+    private boolean createShellEntityForNonExistingReference = false;
 
     @Override
     public void init(FilterConfig filterConfig) throws ServletException {
         LOG.info("AuditFilter initialization started");
 
-        deleteTypeOverrideEnabled = REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+        deleteTypeOverrideEnabled                = REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+        createShellEntityForNonExistingReference = REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
 
         LOG.info("REST_API_ENABLE_DELETE_TYPE_OVERRIDE={}", deleteTypeOverrideEnabled);
     }
@@ -87,6 +90,7 @@ public class AuditFilter implements Filter {
             RequestContextV1.clear();
             RequestContextV1 requestContextV1 = RequestContextV1.get();
             requestContextV1.setUser(user);
+            requestContextV1.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
 
             if (StringUtils.isNotEmpty(deleteType)) {
                 if (deleteTypeOverrideEnabled) {