You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by nb...@apache.org on 2021/05/21 11:11:38 UTC

[atlas] 04/04: ATLAS-4152: Entity correlation for deleted entities.

This is an automated email from the ASF dual-hosted git repository.

nbonte pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit c3df22605795b9bddcba547852c04461ee9b8203
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu May 20 15:07:27 2021 -0700

    ATLAS-4152: Entity correlation for deleted entities.
    
    Signed-off-by: Nikhil Bonte <nb...@apache.org>
---
 .../org/apache/atlas/repository/Constants.java     |  2 +
 .../notification/AtlasNotificationMessage.java     | 23 ++++--
 .../org/apache/atlas/kafka/AtlasKafkaConsumer.java |  3 +-
 .../org/apache/atlas/kafka/AtlasKafkaMessage.java  | 17 ++++-
 .../org/apache/atlas/kafka/KafkaNotification.java  | 12 ++++
 .../AtlasNotificationMessageDeserializer.java      | 17 ++++-
 .../atlas/notification/NotificationInterface.java  |  9 +++
 .../atlas/notification/spool/AtlasFileSpool.java   | 30 ++++++--
 .../apache/atlas/notification/spool/Publisher.java | 40 +++++++++--
 .../notification/spool/SpoolConfiguration.java     | 16 +++++
 .../apache/atlas/notification/spool/Spooler.java   | 17 ++++-
 .../notification/AbstractNotificationTest.java     |  5 ++
 .../notification/spool/AtlasFileSpoolTest.java     |  5 ++
 .../repository/graph/GraphBackedSearchIndexer.java |  1 +
 .../store/graph/EntityCorrelationStore.java        | 53 ++++++++++++++
 .../store/graph/v2/AtlasGraphUtilsV2.java          | 21 ++++++
 .../store/graph/v2/EntityCorrelationStoreTest.java | 83 ++++++++++++++++++++++
 .../notification/EntityCorrelationManager.java     | 60 ++++++++++++++++
 .../notification/NotificationHookConsumer.java     | 15 +++-
 .../preprocessor/EntityPreprocessor.java           | 16 ++++-
 .../preprocessor/HiveDbDDLPreprocessor.java        | 52 ++++++++++++++
 .../preprocessor/HivePreprocessor.java             | 28 ++++++++
 .../preprocessor/HiveTableDDLPreprocessor.java     | 52 ++++++++++++++
 .../preprocessor/PreprocessorContext.java          | 17 ++++-
 .../NotificationHookConsumerKafkaTest.java         |  6 +-
 .../notification/NotificationHookConsumerTest.java | 22 +++---
 26 files changed, 579 insertions(+), 43 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index ffcec97..aea0c13 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -135,6 +135,8 @@ public final class Constants {
 
     public static final String TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "timestamp");
 
+    public static final String ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityDeletedTimestamp");
+
     public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
 
     public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
index 810ba97..5869910 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
@@ -40,9 +40,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)
 public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
-    private String msgSourceIP;
-    private String msgCreatedBy;
-    private long   msgCreationTime;
+    private String  msgSourceIP;
+    private String  msgCreatedBy;
+    private long    msgCreationTime;
+    private boolean spooled;
 
     /**
      * The actual message.
@@ -55,18 +56,22 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
     }
 
     public AtlasNotificationMessage(MessageVersion version, T message) {
-        this(version, message, null, null);
+        this(version, message, null, null, false);
     }
 
-    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
+    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled) {
         super(version);
 
         this.msgSourceIP     = msgSourceIP;
         this.msgCreatedBy    = createdBy;
         this.msgCreationTime = (new Date()).getTime();
         this.message         = message;
+        this.spooled         = spooled;
     }
 
+    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
+        this(version, message, msgSourceIP, createdBy, false);
+    }
 
     public String getMsgSourceIP() {
         return msgSourceIP;
@@ -99,4 +104,12 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
     public void setMessage(T message) {
         this.message = message;
     }
+
+    public void setSpooled(boolean val) {
+        this.spooled = val;
+    }
+
+    public boolean getSpooled() {
+        return spooled;
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index f7d9668..96dc585 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -134,7 +134,8 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
                     continue;
                 }
 
-                messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition()));
+                messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
+                                                            deserializer.getMsgCreated(), deserializer.getSpooled()));
             }
         }
 
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
index 22bd79f..af3727d 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -24,11 +24,19 @@ public class AtlasKafkaMessage<T> {
     private final T              message;
     private final long           offset;
     private final TopicPartition topicPartition;
+    private final boolean        spooled;
+    private final long           msgCreated;
 
-    public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
+    public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) {
         this.message        = message;
         this.offset         = offset;
         this.topicPartition = new TopicPartition(topic, partition);
+        this.msgCreated     = msgCreated;
+        this.spooled        = spooled;
+    }
+
+    public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
+        this(message, offset, topic, partition, 0, false);
     }
 
     public T getMessage() {
@@ -51,4 +59,11 @@ public class AtlasKafkaMessage<T> {
         return topicPartition.partition();
     }
 
+    public boolean getSpooled() {
+        return this.spooled;
+    }
+
+    public long getMsgCreated() {
+        return this.msgCreated;
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 3d1b3cc..32f5183 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -172,6 +172,18 @@ public class KafkaNotification extends AbstractNotification implements Service {
 
 
     // ----- NotificationInterface -------------------------------------------
+    public boolean isReady(NotificationType notificationType) {
+        try {
+            KafkaProducer producer = getOrCreateProducer(notificationType);
+            producer.metrics();
+            return true;
+        }
+        catch (Exception exception) {
+            LOG.error("Error: Connecting... {}", exception.getMessage());
+            return false;
+        }
+    }
+
     @Override
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
         return createConsumers(notificationType, numConsumers, Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false"))));
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 3264e26..b43bc7c 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -62,6 +62,8 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     private long                                      splitMessagesLastPurgeTime    = System.currentTimeMillis();
     private final AtomicLong                          messageCountTotal             = new AtomicLong(0);
     private final AtomicLong                          messageCountSinceLastInterval = new AtomicLong(0);
+    private long                                      msgCreated;
+    private boolean                                   spooled;
     // ----- Constructors ----------------------------------------------------
 
     /**
@@ -101,18 +103,31 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     }
 
     // ----- MessageDeserializer ---------------------------------------------
+    public long getMsgCreated() {
+        return this.msgCreated;
+    }
+
+    public boolean getSpooled() {
+        return this.spooled;
+    }
+
     @Override
     public T deserialize(String messageJson) {
         final T ret;
 
         messageCountTotal.incrementAndGet();
         messageCountSinceLastInterval.incrementAndGet();
+        this.msgCreated = 0;
+        this.spooled = false;
 
-        AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class);
+        AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class);
 
         if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
             ret = AtlasType.fromV1Json(messageJson, messageType);
         } else  {
+            this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime();
+            this.spooled = ((AtlasNotificationMessage) msg).getSpooled();
+
             String msgJson = messageJson;
 
             if (msg.getMsgSplitCount() > 1) { // multi-part message
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index edd8ed9..3d8d9cc 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -110,4 +110,13 @@ public interface NotificationInterface {
      * Shutdown any notification producers and consumers associated with this interface instance.
      */
     void close();
+
+    /**
+     *  Check if underlying notification mechanism is ready for use.
+     *
+     * @param type tye message type
+     * @return true if available, false otherwise
+     *
+     */
+    boolean isReady(NotificationType type);
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index 2d7d195..ea31284 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -27,6 +27,7 @@ import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -51,7 +52,7 @@ public class AtlasFileSpool implements NotificationInterface {
 
     @Override
     public void init(String source, Object failedMessagesLogger) {
-        LOG.info("==> AtlasFileSpool.init(source={})", source);
+        LOG.debug("==> AtlasFileSpool.init(source={})", source);
 
         if (!isInitDone()) {
             try {
@@ -76,10 +77,10 @@ public class AtlasFileSpool implements NotificationInterface {
                 LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t);
             }
         } else {
-            LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
+            LOG.debug("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
         }
 
-        LOG.info("<== AtlasFileSpool.init(source={})", source);
+        LOG.debug("<== AtlasFileSpool.init(source={})", source);
     }
 
     @Override
@@ -100,29 +101,35 @@ public class AtlasFileSpool implements NotificationInterface {
     }
 
     @Override
+    public boolean isReady(NotificationType type) {
+        return true;
+    }
+
+    @Override
     public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        List<String> serializedMessages = getSerializedMessages(messages);
         if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("AtlasFileSpool.send(): sending to spooler");
             }
 
-            spooler.send(type, messages);
+            spooler.sendInternal(type, serializedMessages);
         } else {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
             }
 
             try {
-                notificationHandler.send(type, messages);
+                notificationHandler.sendInternal(type, serializedMessages);
             } catch (Exception e) {
                 if (isInitDone()) {
                     LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e);
 
                     publisher.setDestinationDown();
 
-                    spooler.send(type, messages);
+                    spooler.sendInternal(type, serializedMessages);
                 } else {
-                    LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e);
+                    LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not initialized.", e);
 
                     throw e;
                 }
@@ -130,6 +137,15 @@ public class AtlasFileSpool implements NotificationInterface {
         }
     }
 
+    private <T> List<String> getSerializedMessages(List<T> messages) {
+        List<String> serializedMessages = new ArrayList<>(messages.size());
+        for (int index = 0; index < messages.size(); index++) {
+            notificationHandler.createNotificationMessages(messages.get(index), serializedMessages);
+        }
+
+        return serializedMessages;
+    }
+
     @Override
     public void close() {
         try {
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
index 22242c9..01ead7d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
@@ -65,14 +65,16 @@ public class Publisher implements Runnable {
             IndexRecord record = null;
 
             while (true) {
-                waitIfDestinationDown();
-
+                checkAndWaitIfDestinationDown();
                 if (this.isDrain) {
                     break;
                 }
 
-                record = fetchNext(record);
+                if (isDestDown) {
+                    continue;
+                }
 
+                record = fetchNext(record);
                 if (record != null && processAndDispatch(record)) {
                     indexManagement.removeAsDone(record);
 
@@ -104,14 +106,14 @@ public class Publisher implements Runnable {
         return isDestDown;
     }
 
-    private void waitIfDestinationDown() throws InterruptedException {
+    private void checkAndWaitIfDestinationDown() throws InterruptedException {
+        isDestDown = !notificationHandler.isReady(NotificationInterface.NotificationType.HOOK);
         if (isDestDown) {
             LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items",
                      this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize());
 
             Thread.sleep(retryDestinationMS);
         }
-
     }
 
     private IndexRecord fetchNext(IndexRecord record) {
@@ -147,7 +149,7 @@ public class Publisher implements Runnable {
 
                     messages.add(message);
 
-                    if ((isDestDown && messages.size() == 1) || messages.size() == messageBatchSize) {
+                    if (messages.size() == messageBatchSize) {
                         dispatch(record, lineInSpoolFile, messages);
                     }
                 }
@@ -192,6 +194,8 @@ public class Publisher implements Runnable {
 
     private void dispatch(String filePath, List<String> messages) throws Exception {
         try {
+            pauseBeforeSend();
+
             notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages);
 
             if (isDestDown) {
@@ -207,4 +211,28 @@ public class Publisher implements Runnable {
             messages.clear();
         }
     }
+
+    /**
+     * Reason for pauseBeforeSend:
+     *  - EntityCorrelation is needed to be able to stitch lineage to the correct entity.
+     *  - Background: When messages are added to Kafka queue directly, the ordering is incidentally guaranteed, where
+     *     messages from lineage producing hooks reach immediately after messages from entities producing hooks.
+     *  - When Spooled messages are posted onto Kafka, this order cannot be guaranteed. The entity correlation logic within Atlas
+     *     can attach lineage to the correct entity, provided that the entity participating in the lineage is already present.
+     *
+     *   This logic of entity correlation works well for majority of cases except where lineage entities are created before regular entities.
+     *   In this case, shell entities get created in the absence of real entities. Problem is that there is 1 shell entity for any number of references.
+     *   Circumventing this limitation is not easy.
+     *
+     *   The pauseBeforeSend forces the situation where HiveMetaStore generated messages reach Kafka before lineage-producing hooks.
+     *
+     * @throws InterruptedException
+     */
+    private void pauseBeforeSend() throws InterruptedException {
+        if (!configuration.isHiveMetaStore()) {
+            int waitMs = configuration.getPauseBeforeSendSec() * 1000;
+            LOG.info("Waiting before dispatch: {}", waitMs);
+            Thread.sleep(waitMs);
+        }
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
index a9a3a78..74b8687 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -25,9 +25,11 @@ public class SpoolConfiguration {
     private static final int    PROP_RETRY_DESTINATION_MS_DEFAULT               = 30000; // Default 30 seconds
     private static final int    PROP_FILE_ROLLOVER_SEC_DEFAULT                  = 60; // 60 secs
     private static final int    PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100;
+    private static final int    PROP_PAUSE_BEFORE_SEND_MS_DEFAULT               = 60;
     private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT             = "archive";
     private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT               = "/tmp/spool";
     private static final int    PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT            = 100;
+    private static final String PROP_HIVE_METASTORE_NAME_DEFAULT                = "HiveMetastoreHookImpl";
     private static final String PROPERTY_PREFIX_SPOOL                           = "atlas.hook.spool.";
     public  static final String PROP_FILE_SPOOL_LOCAL_DIR                       = PROPERTY_PREFIX_SPOOL + "dir";
     private static final String PROP_FILE_SPOOL_ARCHIVE_DIR                     = PROPERTY_PREFIX_SPOOL + "archive.dir";
@@ -35,6 +37,8 @@ public class SpoolConfiguration {
     public  static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC               = PROPERTY_PREFIX_SPOOL + "file.rollover.sec";
     public  static final String PROP_FILE_SPOOL_DEST_RETRY_MS                   = PROPERTY_PREFIX_SPOOL + "destination.retry.ms";
     private static final String PROP_MESSAGE_BATCH_SIZE                         = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize";
+    private static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC           = PROPERTY_PREFIX_SPOOL + "pause.before.send.sec";
+    private static final String PROP_HIVE_METASTORE_NAME                        = PROPERTY_PREFIX_SPOOL + "hivemetastore.name";
 
     private final String messageHandlerName;
     private final int    maxArchivedFilesCount;
@@ -44,6 +48,8 @@ public class SpoolConfiguration {
     private final int    fileSpoolMaxFilesCount;
     private final String spoolDirPath;
     private final String archiveDir;
+    private final int    pauseBeforeSendSec;
+    private final String hiveMetaStoreName;
     private       String sourceName;
 
     public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
@@ -51,10 +57,12 @@ public class SpoolConfiguration {
         this.maxArchivedFilesCount  = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
         this.messageBatchSize       = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
         this.retryDestinationMS     = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT);
+        this.pauseBeforeSendSec     = cfg.getInt(PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC, PROP_PAUSE_BEFORE_SEND_MS_DEFAULT);
         this.fileRollOverSec        = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000;
         this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
         this.spoolDirPath           = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT);
         this.archiveDir             = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
+        this.hiveMetaStoreName      = cfg.getString(PROP_HIVE_METASTORE_NAME, PROP_HIVE_METASTORE_NAME_DEFAULT);
     }
 
     public void setSource(String val) {
@@ -120,4 +128,12 @@ public class SpoolConfiguration {
 
         return new File(getSpoolDir(), fileDoneName);
     }
+
+    public int getPauseBeforeSendSec() {
+        return pauseBeforeSendSec;
+    }
+
+    public boolean isHiveMetaStore() {
+        return this.sourceName.equals(this.hiveMetaStoreName);
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
index 2cacaaa..a918e9b 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
@@ -19,14 +19,14 @@ package org.apache.atlas.notification.spool;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.commons.io.IOUtils;
+import org.apache.atlas.type.AtlasType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.DataOutput;
-import java.io.PrintWriter;
 import java.util.List;
 
 public class Spooler extends AbstractNotification {
@@ -57,8 +57,14 @@ public class Spooler extends AbstractNotification {
 
     @Override
     public void sendInternal(NotificationType type, List<String> messages) {
-        boolean ret = write(messages);
+        for (int i = 0; i < messages.size(); i++) {
+            AtlasNotificationMessage e = AtlasType.fromV1Json(messages.get(i), AtlasNotificationMessage.class);
+            e.setSpooled(true);
+
+            messages.set(i, AtlasType.toV1Json(e));
+        }
 
+        boolean ret = write(messages);
         if (failedMessagesLogger != null && !ret) {
             writeToFailedMessages(messages);
         }
@@ -68,6 +74,11 @@ public class Spooler extends AbstractNotification {
     public void close() {
     }
 
+    @Override
+    public boolean isReady(NotificationType type) {
+        return true;
+    }
+
     @VisibleForTesting
     boolean write(List<String> messages) {
         final boolean ret;
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index d7e4959..8078a6c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -127,5 +127,10 @@ public class AbstractNotificationTest {
         @Override
         public void close() {
         }
+
+        @Override
+        public boolean isReady(NotificationType type) {
+            return true;
+        }
     }
 }
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
index 167efbe..265598e 100644
--- a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
@@ -81,6 +81,11 @@ public class AtlasFileSpoolTest extends BaseTest {
         public void close() {
 
         }
+
+        @Override
+        public boolean isReady(NotificationType type) {
+            return true;
+        }
     }
 
     @Test
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index cc727c6..ddfb008 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -339,6 +339,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
             createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
+            createCommonVertexIndex(management, ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, UniqueKind.NONE, Long.class, SINGLE, true, false);
 
             createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java
new file mode 100644
index 0000000..4760757
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph;
+
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class EntityCorrelationStore {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationStore.class);
+
+    public EntityCorrelationStore() {
+    }
+
+    @GraphTransaction
+    public void add(String entityGuid, long messageTimestamp) {
+        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(entityGuid);
+        if (v == null) {
+            LOG.warn("Fetching: {} did not yield result!", entityGuid);
+            return;
+        }
+
+        AtlasGraphUtilsV2.setEncodedProperty(v, Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, messageTimestamp);
+        LOG.info("Updating: {}: {}", entityGuid, messageTimestamp);
+    }
+
+    public String findCorrelatedGuid(String qualifiedName, long messageTimestamp) {
+        String guid = AtlasGraphUtilsV2.findFirstDeletedDuringSpooledByQualifiedName(qualifiedName, messageTimestamp);
+
+        LOG.info("findCorrelatedGuid: {} - {} -> {}", qualifiedName, messageTimestamp, guid);
+        return guid;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 0a94708..e73f084 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -367,6 +367,27 @@ public class AtlasGraphUtilsV2 {
         return vertex;
     }
 
+    public static String findFirstDeletedDuringSpooledByQualifiedName(String qualifiedName, long timestamp) {
+        return findFirstDeletedDuringSpooledByQualifiedName(getGraphInstance(), qualifiedName, timestamp);
+    }
+
+    public static String findFirstDeletedDuringSpooledByQualifiedName(AtlasGraph graph, String qualifiedName, long timestamp) {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("findDeletedDuringSpooledByQualifiedName");
+
+        AtlasGraphQuery query = graph.query().has(STATE_PROPERTY_KEY, Status.DELETED.name())
+                                             .has(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.GREATER_THAN, timestamp)
+                                             .has(Constants.QUALIFIED_NAME, qualifiedName)
+                                             .orderBy(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, ASC);
+
+        Iterator iterator = query.vertices().iterator();
+
+        String ret = iterator.hasNext() ? GraphHelper.getGuid((AtlasVertex) iterator.next()) : null;
+
+        RequestContext.get().endMetricRecord(metric);
+
+        return ret;
+    }
+
     public static AtlasVertex findByGuid(String guid) {
         return findByGuid(getGraphInstance(), guid);
     }
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java
new file mode 100644
index 0000000..a3be5f4
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.BasicTestSetup;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class EntityCorrelationStoreTest extends BasicTestSetup {
+    @Inject
+    AtlasGraph graph;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.initialize();
+
+        setupTestData();
+    }
+
+    @Test
+    public void verify() throws IOException, AtlasBaseException {
+        final String nonExistentQName = "db01@cm";
+        final String db01QName = "db01x@cm";
+        final EntityCorrelationStore entityCorrelationStore = new EntityCorrelationStore();
+
+        String db01 = TestResourceFileUtils.getJson("entities", "db01");
+
+        AtlasEntity.AtlasEntitiesWithExtInfo db = AtlasType.fromJson(db01, AtlasEntity.AtlasEntitiesWithExtInfo.class);
+        EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(db), false);
+
+        String dbGuid = response.getFirstEntityCreated().getGuid();
+        entityStore.deleteById(dbGuid);
+
+        entityCorrelationStore.add(dbGuid,2L);
+        graph.commit();
+
+        String guid = entityCorrelationStore.findCorrelatedGuid(nonExistentQName, 1);
+        assertNull(guid);
+
+        String fetchedGuid = entityCorrelationStore.findCorrelatedGuid(db01QName, 1L);
+        assertNotNull(fetchedGuid);
+        assertEquals(fetchedGuid, dbGuid);
+
+        guid = entityCorrelationStore.findCorrelatedGuid(db01QName, 2L);
+        assertNull(guid);
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
new file mode 100644
index 0000000..f1d6aff
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class EntityCorrelationManager {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationManager.class);
+
+    private final EntityCorrelationStore entityCorrelationStore;
+
+    public EntityCorrelationManager(EntityCorrelationStore entityCorrelationStore) {
+        this.entityCorrelationStore = entityCorrelationStore;
+    }
+
+    public void add(boolean spooled, long spooledTimestamp, List<AtlasEntityHeader> entityHeaders) {
+        if (this.entityCorrelationStore == null || spooled == false || CollectionUtils.isEmpty(entityHeaders)) {
+            return;
+        }
+
+        for (AtlasEntityHeader entityHeader : entityHeaders) {
+            String guid = entityHeader.getGuid();
+            if (StringUtils.isNotEmpty(guid)) {
+                entityCorrelationStore.add(guid, spooledTimestamp);
+            }
+        }
+    }
+
+    public String getGuidForDeletedEntityToBeCorrelated(String qualifiedName, long spooledMessageTimestamp) {
+        if (this.entityCorrelationStore == null || spooledMessageTimestamp <= 0) {
+            return null;
+        }
+
+        String guid = entityCorrelationStore.findCorrelatedGuid(qualifiedName, spooledMessageTimestamp);
+        LOG.info("{}: spooledTimestamp: {} -> {}", qualifiedName, spooledMessageTimestamp, guid);
+        return guid;
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 84cc8d8..5643af9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -40,6 +40,7 @@ import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
 import org.apache.atlas.util.AtlasMetricsCounter;
 import org.apache.atlas.utils.AtlasJson;
 import org.apache.atlas.utils.LruCache;
@@ -191,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private       ExecutorService               executors;
     private       Instant                       nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now());
     private final Map<TopicPartition, Long>     lastCommittedPartitionOffset;
+    private final EntityCorrelationManager      entityCorrelationManager;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -201,7 +203,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     @Inject
     public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
                                     ServiceState serviceState, AtlasInstanceConverter instanceConverter,
-                                    AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) throws AtlasException {
+                                    AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil,
+                                    EntityCorrelationStore entityCorrelationStore) throws AtlasException {
         this.notificationInterface = notificationInterface;
         this.atlasEntityStore      = atlasEntityStore;
         this.serviceState          = serviceState;
@@ -308,7 +311,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         hiveTypesRemoveOwnedRefAttrs  = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
         rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
         preprocessEnabled             = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
-
+        entityCorrelationManager      = new EntityCorrelationManager(entityCorrelationStore);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
         LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
@@ -688,6 +691,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                     EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
 
                                     stats.updateStats(response);
+                                    entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to delete entity {}", deleteRequest);
                                 }
@@ -770,6 +774,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                         EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
 
                                         stats.updateStats(response);
+                                        entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
                                     }
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to do delete entities {}", entities);
@@ -889,6 +894,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                     RequestContext.get().resetEntityGuidUpdates();
 
+                    entityCorrelationManager.add(context.isSpooledMessage(), context.getMsgCreated(), response.getDeletedEntities());
+
                     RequestContext.get().clearCache();
                 }
             }
@@ -973,7 +980,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         PreprocessorContext context = null;
 
         if (preprocessEnabled) {
-            context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName);
+            context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
+                    hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
+                    rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
 
             if (context.isHivePreprocessEnabled()) {
                 preprocessHiveTypes(context);
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 89568e2..7f0cafe 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -32,6 +32,8 @@ public abstract class EntityPreprocessor {
     public static final String TYPE_HIVE_PROCESS        = "hive_process";
     public static final String TYPE_HIVE_STORAGEDESC    = "hive_storagedesc";
     public static final String TYPE_HIVE_DB             = "hive_db";
+    public static final String TYPE_HIVE_DB_DDL         = "hive_db_ddl";
+    public static final String TYPE_HIVE_TABLE_DDL      = "hive_table_ddl";
     public static final String TYPE_HIVE_TABLE          = "hive_table";
     public static final String TYPE_RDBMS_INSTANCE      = "rdbms_instance";
     public static final String TYPE_RDBMS_DB            = "rdbms_db";
@@ -71,11 +73,13 @@ public abstract class EntityPreprocessor {
     static {
         EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
                                                                     new HivePreprocessor.HiveDbPreprocessor(),
+                                                                    new HiveDbDDLPreprocessor(),
                                                                     new HivePreprocessor.HiveTablePreprocessor(),
                                                                     new HivePreprocessor.HiveColumnPreprocessor(),
                                                                     new HivePreprocessor.HiveProcessPreprocessor(),
                                                                     new HivePreprocessor.HiveColumnLineageProcessPreprocessor(),
-                                                                    new HivePreprocessor.HiveStorageDescPreprocessor()
+                                                                    new HivePreprocessor.HiveStorageDescPreprocessor(),
+                                                                    new HiveTableDDLPreprocessor()
         };
 
         EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] {
@@ -158,6 +162,16 @@ public abstract class EntityPreprocessor {
         return ret != null ? ret.toString() : null;
     }
 
+    public void setObjectIdWithGuid(Object obj, String guid) {
+        if (obj instanceof AtlasObjectId) {
+            AtlasObjectId objectId = (AtlasObjectId) obj;
+            objectId.setGuid(guid);
+        } else if (obj instanceof Map) {
+            Map map = (Map) obj;
+            map.put("guid", guid);
+        }
+    }
+
     protected boolean isEmpty(Object obj) {
         return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty());
     }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
new file mode 100644
index 0000000..dcff093
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveDbDDLPreprocessor extends EntityPreprocessor {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveDbDDLPreprocessor.class);
+
+    protected HiveDbDDLPreprocessor() {
+        super(TYPE_HIVE_DB_DDL);
+    }
+
+    @Override
+    public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+        if (!context.isSpooledMessage()) {
+            return;
+        }
+
+        Object dbObject = entity.getRelationshipAttribute(ATTRIBUTE_DB);
+        if (dbObject == null) {
+            return;
+        }
+
+        String qualifiedName = getQualifiedName(dbObject);
+        String guid = context.getGuidForDeletedEntity(qualifiedName);
+        if (StringUtils.isEmpty(guid)) {
+            return;
+        }
+
+        setObjectIdWithGuid(dbObject, guid);
+        LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 86e3384..bf6a623 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -228,6 +228,34 @@ public class HivePreprocessor {
                     }
                 }
             }
+
+            preprocessCheckpoint(entity, context);
+        }
+
+        private void preprocessCheckpoint(AtlasEntity entity, PreprocessorContext context) {
+            if (!context.isSpooledMessage()) {
+                return;
+            }
+
+            String[] relationshipNames = new String[]{ATTRIBUTE_INPUTS, ATTRIBUTE_OUTPUTS};
+            for (String relationshipName : relationshipNames) {
+                Object val = entity.getRelationshipAttribute(relationshipName);
+                if (!isEmpty(val) && val instanceof List) {
+                    updateListWithGuids(context, (List) val);
+                }
+            }
+        }
+
+        private void updateListWithGuids(PreprocessorContext context, List list) {
+            for (Object o : list) {
+                String qn = getQualifiedName(o);
+                String guid = context.getGuidForDeletedEntity(qn);
+                if (StringUtils.isEmpty(guid)) {
+                    continue;
+                }
+
+                setObjectIdWithGuid(o, guid);
+            }
         }
 
         private int getCollectionSize(Object obj) {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
new file mode 100644
index 0000000..83d4d7c
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveTableDDLPreprocessor extends EntityPreprocessor {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveTableDDLPreprocessor.class);
+
+    protected HiveTableDDLPreprocessor() {
+        super(TYPE_HIVE_TABLE_DDL);
+    }
+
+    @Override
+    public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+        if (!context.isSpooledMessage()) {
+            return;
+        }
+
+        Object tableObject = entity.getRelationshipAttribute(ATTRIBUTE_TABLE);
+        if (tableObject == null) {
+            return;
+        }
+
+        String qualifiedName = getQualifiedName(tableObject);
+        String guid = context.getGuidForDeletedEntity(qualifiedName);
+        if (StringUtils.isEmpty(guid)) {
+            return;
+        }
+
+        setObjectIdWithGuid(tableObject, guid);
+        LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 608b4a3..59f6440 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.EntityCorrelationManager;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.collections.CollectionUtils;
@@ -69,9 +70,10 @@ public class PreprocessorContext {
     private final Set<String>                         createdEntities        = new HashSet<>();
     private final Set<String>                         deletedEntities        = new HashSet<>();
     private final Map<String, String>                 guidAssignments        = new HashMap<>();
+    private final EntityCorrelationManager            correlationManager;
     private       List<AtlasEntity>                   postUpdateEntities     = null;
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName) {
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationMana [...]
         this.kafkaMessage                           = kafkaMessage;
         this.typeRegistry                           = typeRegistry;
         this.hiveTablesToIgnore                     = hiveTablesToIgnore;
@@ -101,6 +103,7 @@ public class PreprocessorContext {
         }
 
         this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
+        this.correlationManager = correlationManager;
     }
 
     public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
@@ -577,4 +580,16 @@ public class PreprocessorContext {
             partialEntity.setAttribute(attrName, attrVal);
         }
     }
+
+    public long getMsgCreated() {
+        return kafkaMessage.getMsgCreated();
+    }
+
+    public boolean isSpooledMessage() {
+        return kafkaMessage.getSpooled();
+    }
+
+    public String getGuidForDeletedEntity(String qualifiedName) {
+        return this.correlationManager.getGuidForDeletedEntityToBeCorrelated(qualifiedName, kafkaMessage.getMsgCreated());
+    }
 }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 65e8b50..fdfc256 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -109,7 +109,7 @@ public class NotificationHookConsumerKafkaTest {
         produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
 
         NotificationConsumer<HookNotification> consumer                 = createNewConsumer(kafkaNotification, false);
-        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
         consumeOneMessage(consumer, hookConsumer);
@@ -128,7 +128,7 @@ public class NotificationHookConsumerKafkaTest {
     public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
 
         ExceptionThrowingCommitConsumer        consumer                 = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
-        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
         produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
@@ -159,7 +159,7 @@ public class NotificationHookConsumerKafkaTest {
 
         assertNotNull (consumer);
 
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
         consumeOneMessage(consumer, hookConsumer);
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 15a1900..f440c42 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -98,7 +98,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerCanProceedIfServerIsReady() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
@@ -111,7 +111,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
@@ -128,7 +128,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
-        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationConsumer                   consumer                 = mock(NotificationConsumer.class);
         NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
         EntityCreateRequest                    message                  = mock(EntityCreateRequest.class);
@@ -145,7 +145,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationConsumer                  consumer                 = mock(NotificationConsumer.class);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
         EntityCreateRequest                   message                  = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
@@ -159,7 +159,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
@@ -179,7 +179,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         notificationHookConsumer.startInternal(configuration, executorService);
 
         verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
@@ -197,7 +197,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         notificationHookConsumer.startInternal(configuration, executorService);
 
@@ -216,7 +216,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsActive();
@@ -236,7 +236,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         doAnswer(new Answer() {
             @Override
@@ -267,7 +267,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
@@ -332,6 +332,6 @@ public class NotificationHookConsumerTest {
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
     }
 }