You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by nb...@apache.org on 2021/05/21 11:11:34 UTC

[atlas] branch master updated (aba97b3 -> c3df226)

This is an automated email from the ASF dual-hosted git repository.

nbonte pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from aba97b3  ATLAS-4285: Multiple propagations with intersecting lineage.
     new 7e2e130  ATLAS-4292 : Atlas Debug Metrics- MessageException through debug metrics via browser
     new 4691650  ATLAS-4064: Atlas HEADER validation
     new 07037d2  ATLAS-4259: Swagger: Improve Header validation
     new c3df226  ATLAS-4152: Entity correlation for deleted entities.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build-tools/src/main/resources/ui-dist/index.html  |   1 +
 build-tools/src/main/resources/ui-dist/index.js    | 138 ++++++++++++++-------
 .../org/apache/atlas/repository/Constants.java     |   2 +
 dashboardv2/public/js/utils/CommonViewFunction.js  |  30 ++---
 dashboardv3/public/js/utils/CommonViewFunction.js  |  29 ++---
 .../notification/AtlasNotificationMessage.java     |  23 +++-
 .../org/apache/atlas/kafka/AtlasKafkaConsumer.java |   3 +-
 .../org/apache/atlas/kafka/AtlasKafkaMessage.java  |  17 ++-
 .../org/apache/atlas/kafka/KafkaNotification.java  |  12 ++
 .../AtlasNotificationMessageDeserializer.java      |  17 ++-
 .../atlas/notification/NotificationInterface.java  |   9 ++
 .../atlas/notification/spool/AtlasFileSpool.java   |  30 +++--
 .../apache/atlas/notification/spool/Publisher.java |  40 +++++-
 .../notification/spool/SpoolConfiguration.java     |  16 +++
 .../apache/atlas/notification/spool/Spooler.java   |  17 ++-
 .../notification/AbstractNotificationTest.java     |   5 +
 .../notification/spool/AtlasFileSpoolTest.java     |   5 +
 .../repository/graph/GraphBackedSearchIndexer.java |   1 +
 .../store/graph/EntityCorrelationStore.java        |  53 ++++++++
 .../store/graph/v2/AtlasGraphUtilsV2.java          |  21 ++++
 ...tsTest.java => EntityCorrelationStoreTest.java} |  59 ++++-----
 .../notification/EntityCorrelationManager.java     |  60 +++++++++
 .../notification/NotificationHookConsumer.java     |  15 ++-
 .../preprocessor/EntityPreprocessor.java           |  16 ++-
 .../preprocessor/HiveDbDDLPreprocessor.java        |  52 ++++++++
 .../preprocessor/HivePreprocessor.java             |  28 +++++
 .../preprocessor/HiveTableDDLPreprocessor.java     |  52 ++++++++
 .../preprocessor/PreprocessorContext.java          |  17 ++-
 .../web/filters/AtlasCSRFPreventionFilter.java     |  41 ++++--
 .../apache/atlas/web/resources/AdminResource.java  |  22 +++-
 .../NotificationHookConsumerKafkaTest.java         |   6 +-
 .../notification/NotificationHookConsumerTest.java |  22 ++--
 .../web/filters/AtlasCSRFPreventionFilterTest.java |  31 +++++
 33 files changed, 719 insertions(+), 171 deletions(-)
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java
 copy repository/src/test/java/org/apache/atlas/repository/store/graph/v2/{DifferentialAuditsTest.java => EntityCorrelationStoreTest.java} (51%)
 create mode 100644 webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
 create mode 100644 webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
 create mode 100644 webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java

[atlas] 02/04: ATLAS-4064: Atlas HEADER validation

Posted by nb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nbonte pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 4691650dfe26f13884483bba6025cb66f4f818da
Author: nixonrodrigues <ni...@apache.org>
AuthorDate: Thu May 20 21:57:00 2021 -0700

    ATLAS-4064: Atlas HEADER validation
    
    Signed-off-by: Nikhil Bonte <nb...@apache.org>
---
 dashboardv2/public/js/utils/CommonViewFunction.js  | 30 +++++++---------
 dashboardv3/public/js/utils/CommonViewFunction.js  | 29 +++++++--------
 .../web/filters/AtlasCSRFPreventionFilter.java     | 41 ++++++++++++++++------
 .../apache/atlas/web/resources/AdminResource.java  | 18 +++++++---
 .../web/filters/AtlasCSRFPreventionFilterTest.java | 31 ++++++++++++++++
 5 files changed, 99 insertions(+), 50 deletions(-)

diff --git a/dashboardv2/public/js/utils/CommonViewFunction.js b/dashboardv2/public/js/utils/CommonViewFunction.js
index 80db527..bb3fa3f 100644
--- a/dashboardv2/public/js/utils/CommonViewFunction.js
+++ b/dashboardv2/public/js/utils/CommonViewFunction.js
@@ -793,7 +793,6 @@ define(['require', 'utils/Utils', 'modules/Modal', 'utils/Messages', 'utils/Enum
                 }));
             }
         }
-
     }
     CommonViewFunction.removeCategoryTermAssociation = function(options) {
         if (options) {
@@ -864,13 +863,10 @@ define(['require', 'utils/Utils', 'modules/Modal', 'utils/Messages', 'utils/Enum
         }
     }
     CommonViewFunction.addRestCsrfCustomHeader = function(xhr, settings) {
-        if (settings.url == null) {
-            return;
-        }
-        var method = settings.type;
-        if (CommonViewFunction.restCsrfCustomHeader != null && !CommonViewFunction.restCsrfMethodsToIgnore[method]) {
-            // The value of the header is unimportant.  Only its presence matters.
-            xhr.setRequestHeader(CommonViewFunction.restCsrfCustomHeader, '""');
+        if (null != settings.url) {
+            var method = settings.type;
+            var csrfToken = CommonViewFunction.restCsrfValue;
+            null == CommonViewFunction.restCsrfCustomHeader || CommonViewFunction.restCsrfMethodsToIgnore[method] || xhr.setRequestHeader(CommonViewFunction.restCsrfCustomHeader, csrfToken);
         }
     }
     CommonViewFunction.restCsrfCustomHeader = null;
@@ -900,16 +896,14 @@ define(['require', 'utils/Utils', 'modules/Modal', 'utils/Messages', 'utils/Enum
                             var str = "" + response['atlas.rest-csrf.enabled'];
                             csrfEnabled = (str.toLowerCase() == 'true');
                         }
-                        if (response['atlas.rest-csrf.custom-header']) {
-                            header = response['atlas.rest-csrf.custom-header'].trim();
-                        }
-                        if (response['atlas.rest-csrf.methods-to-ignore']) {
-                            methods = getTrimmedStringArrayValue(response['atlas.rest-csrf.methods-to-ignore']);
-                        }
-                        if (csrfEnabled) {
-                            CommonViewFunction.restCsrfCustomHeader = header;
-                            CommonViewFunction.restCsrfMethodsToIgnore = {};
-                            methods.map(function(method) { CommonViewFunction.restCsrfMethodsToIgnore[method] = true; });
+                        if (response["atlas.rest-csrf.custom-header"] && (header = response["atlas.rest-csrf.custom-header"].trim()),
+                            response["atlas.rest-csrf.methods-to-ignore"] && (methods = getTrimmedStringArrayValue(response["atlas.rest-csrf.methods-to-ignore"])),
+                            csrfEnabled) {
+                            CommonViewFunction.restCsrfCustomHeader = header, CommonViewFunction.restCsrfMethodsToIgnore = {},
+                                CommonViewFunction.restCsrfValue = response["_csrfToken"] || '""',
+                                methods.map(function(method) {
+                                    CommonViewFunction.restCsrfMethodsToIgnore[method] = !0;
+                                });
                             var statusCodeErrorFn = function(error) {
                                 Utils.defaultErrorHandler(null, error)
                             }
diff --git a/dashboardv3/public/js/utils/CommonViewFunction.js b/dashboardv3/public/js/utils/CommonViewFunction.js
index 14a8b74..34afa2d 100644
--- a/dashboardv3/public/js/utils/CommonViewFunction.js
+++ b/dashboardv3/public/js/utils/CommonViewFunction.js
@@ -884,13 +884,10 @@ define(['require', 'utils/Utils', 'modules/Modal', 'utils/Messages', 'utils/Enum
         }
     }
     CommonViewFunction.addRestCsrfCustomHeader = function(xhr, settings) {
-        if (settings.url == null) {
-            return;
-        }
-        var method = settings.type;
-        if (CommonViewFunction.restCsrfCustomHeader != null && !CommonViewFunction.restCsrfMethodsToIgnore[method]) {
-            // The value of the header is unimportant.  Only its presence matters.
-            xhr.setRequestHeader(CommonViewFunction.restCsrfCustomHeader, '""');
+        if (null != settings.url) {
+            var method = settings.type;
+            var csrfToken = CommonViewFunction.restCsrfValue;
+            null == CommonViewFunction.restCsrfCustomHeader || CommonViewFunction.restCsrfMethodsToIgnore[method] || xhr.setRequestHeader(CommonViewFunction.restCsrfCustomHeader, csrfToken);
         }
     }
     CommonViewFunction.restCsrfCustomHeader = null;
@@ -920,16 +917,14 @@ define(['require', 'utils/Utils', 'modules/Modal', 'utils/Messages', 'utils/Enum
                             var str = "" + response['atlas.rest-csrf.enabled'];
                             csrfEnabled = (str.toLowerCase() == 'true');
                         }
-                        if (response['atlas.rest-csrf.custom-header']) {
-                            header = response['atlas.rest-csrf.custom-header'].trim();
-                        }
-                        if (response['atlas.rest-csrf.methods-to-ignore']) {
-                            methods = getTrimmedStringArrayValue(response['atlas.rest-csrf.methods-to-ignore']);
-                        }
-                        if (csrfEnabled) {
-                            CommonViewFunction.restCsrfCustomHeader = header;
-                            CommonViewFunction.restCsrfMethodsToIgnore = {};
-                            methods.map(function(method) { CommonViewFunction.restCsrfMethodsToIgnore[method] = true; });
+                        if (response["atlas.rest-csrf.custom-header"] && (header = response["atlas.rest-csrf.custom-header"].trim()),
+                            response["atlas.rest-csrf.methods-to-ignore"] && (methods = getTrimmedStringArrayValue(response["atlas.rest-csrf.methods-to-ignore"])),
+                            csrfEnabled) {
+                            CommonViewFunction.restCsrfCustomHeader = header, CommonViewFunction.restCsrfMethodsToIgnore = {},
+                                CommonViewFunction.restCsrfValue = response["_csrfToken"] || '""',
+                                methods.map(function(method) {
+                                    CommonViewFunction.restCsrfMethodsToIgnore[method] = !0;
+                                });
                             var statusCodeErrorFn = function(error) {
                                 Utils.defaultErrorHandler(null, error)
                             }
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilter.java
index df3fce6..429ff1c 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilter.java
@@ -21,6 +21,7 @@ package org.apache.atlas.web.filters;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -63,11 +65,13 @@ public class AtlasCSRFPreventionFilter implements Filter {
 	public static final String CUSTOM_HEADER_PARAM = "atlas.rest-csrf.custom-header";
 	public static final String HEADER_DEFAULT = "X-XSRF-HEADER";
 	public static final String HEADER_USER_AGENT = "User-Agent";
+	public static final String CSRF_TOKEN = "_csrfToken";
+
 
 	private String  headerName = HEADER_DEFAULT;
 	private Set<String> methodsToIgnore = null;
 	private Set<Pattern> browserUserAgents;
-	
+
 	public AtlasCSRFPreventionFilter() {
 		try {
 			if (isCSRF_ENABLED){
@@ -167,19 +171,30 @@ public class AtlasCSRFPreventionFilter implements Filter {
 		 *             if there is an I/O error
 		 */
 		void sendError(int code, String message) throws IOException;
-	}	
-	  
-	public void handleHttpInteraction(HttpInteraction httpInteraction)
-			throws IOException, ServletException {
-		if (!isBrowser(httpInteraction.getHeader(HEADER_USER_AGENT))
-				|| methodsToIgnore.contains(httpInteraction.getMethod())
-				|| httpInteraction.getHeader(headerName) != null) {
+	}
+
+	public void handleHttpInteraction(HttpInteraction httpInteraction) throws IOException, ServletException {
+		HttpSession session   = ((ServletFilterHttpInteraction) httpInteraction).getSession();
+		String      csrfToken = StringUtils.EMPTY;
+
+		if (session != null) {
+			csrfToken = (String) session.getAttribute(CSRF_TOKEN);
+		} else {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Session is null");
+			}
+		}
+
+		String clientCsrfToken = httpInteraction.getHeader(headerName);
+
+		if (!isBrowser(httpInteraction.getHeader(HEADER_USER_AGENT)) || methodsToIgnore.contains(httpInteraction.getMethod())
+				|| (clientCsrfToken != null && clientCsrfToken.equals(csrfToken))) {
 			httpInteraction.proceed();
-		}else {
-			httpInteraction.sendError(HttpServletResponse.SC_BAD_REQUEST,"Missing Required Header for CSRF Vulnerability Protection");
+		} else {
+			httpInteraction.sendError(HttpServletResponse.SC_BAD_REQUEST,"Missing header or invalid Header value for CSRF Vulnerability Protection");
 		}
 	}
-	
+
 	public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
         final HttpServletRequest httpRequest = (HttpServletRequest) request;
         final HttpServletResponse httpResponse = (HttpServletResponse) response;
@@ -235,6 +250,10 @@ public class AtlasCSRFPreventionFilter implements Filter {
 			chain.doFilter(httpRequest, httpResponse);
 		}
 
+		public HttpSession getSession() {
+			return httpRequest.getSession();
+		}
+
 		@Override
 		public void sendError(int code, String message) throws IOException {
 			JSONObject json = new JSONObject();
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index d124cd2..46d42ba 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -74,6 +74,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +103,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -114,6 +116,8 @@ import java.util.TimeZone;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import static org.apache.atlas.web.filters.AtlasCSRFPreventionFilter.CSRF_TOKEN;
+
 
 /**
  * Jersey Resource for admin operations.
@@ -326,7 +330,7 @@ public class AdminResource {
     @GET
     @Path("session")
     @Produces(Servlets.JSON_MEDIA_TYPE)
-    public Response getUserProfile() {
+    public Response getUserProfile(@Context HttpServletRequest request) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> AdminResource.getUserProfile()");
         }
@@ -364,9 +368,15 @@ public class AdminResource {
         responseData.put("timezones", TIMEZONE_LIST);
         responseData.put(UI_DATE_TIMEZONE_FORMAT_ENABLED, isTimezoneFormatEnabled);
         responseData.put(UI_DATE_FORMAT, uiDateFormat);
-        responseData.put(AtlasConfiguration.DEBUG_METRICS_ENABLED.getPropertyName(), isDebugMetricsEnabled);
-        responseData.put(AtlasConfiguration.TASKS_USE_ENABLED.getPropertyName(), isTasksEnabled);
-        
+
+        String salt = (String) request.getSession().getAttribute(CSRF_TOKEN);
+        if (StringUtils.isEmpty(salt)) {
+            salt = RandomStringUtils.random(20, 0, 0, true, true, null, new SecureRandom());
+            request.getSession().setAttribute(CSRF_TOKEN, salt);
+        }
+
+        responseData.put(CSRF_TOKEN, salt);
+
         response = Response.ok(AtlasJson.toV1Json(responseData)).build();
 
         if (LOG.isDebugEnabled()) {
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilterTest.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilterTest.java
index 954364b..841cfaf 100644
--- a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilterTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasCSRFPreventionFilterTest.java
@@ -23,10 +23,13 @@ import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
 import java.io.IOException;
 import java.io.PrintWriter;
 
+import static org.apache.atlas.web.filters.AtlasCSRFPreventionFilter.CSRF_TOKEN;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 public class AtlasCSRFPreventionFilterTest {
@@ -61,9 +64,15 @@ public class AtlasCSRFPreventionFilterTest {
 		HttpServletRequest mockReq = Mockito.mock(HttpServletRequest.class);
 		Mockito.when(mockReq.getHeader(AtlasCSRFPreventionFilter.HEADER_DEFAULT)).thenReturn("valueUnimportant");
 		Mockito.when(mockReq.getHeader(AtlasCSRFPreventionFilter.HEADER_USER_AGENT)).thenReturn(userAgent);
+		Mockito.when(mockReq.getMethod()).thenReturn("POST");
+
+		HttpSession session = Mockito.mock(HttpSession.class);
+		Mockito.when(session.getAttribute(CSRF_TOKEN)).thenReturn("valueUnimportant");
+		Mockito.when(mockReq.getSession()).thenReturn(session);
 
 		// Objects to verify interactions based on request
 		HttpServletResponse mockRes = Mockito.mock(HttpServletResponse.class);
+
 		FilterChain mockChain = Mockito.mock(FilterChain.class);
 
 		// Object under test
@@ -74,6 +83,28 @@ public class AtlasCSRFPreventionFilterTest {
 	}
 
 	@Test
+	public void testHeaderPresentDefaultConfig_badRequest() throws ServletException, IOException {
+		// CSRF HAS been sent
+		HttpServletRequest mockReq = Mockito.mock(HttpServletRequest.class);
+		Mockito.when(mockReq.getHeader(AtlasCSRFPreventionFilter.HEADER_DEFAULT)).thenReturn("valueUnimportant");
+		Mockito.when(mockReq.getHeader(AtlasCSRFPreventionFilter.HEADER_USER_AGENT)).thenReturn(userAgent);
+		Mockito.when(mockReq.getMethod()).thenReturn("POST");
+
+		// Objects to verify interactions based on request
+		HttpServletResponse mockRes = Mockito.mock(HttpServletResponse.class);
+		PrintWriter mockWriter = Mockito.mock(PrintWriter.class);
+		Mockito.when(mockRes.getWriter()).thenReturn(mockWriter);
+
+		FilterChain mockChain = Mockito.mock(FilterChain.class);
+
+		// Object under test
+		AtlasCSRFPreventionFilter filter = new AtlasCSRFPreventionFilter();
+		filter.doFilter(mockReq, mockRes, mockChain);
+
+		Mockito.verify(mockChain, never()).doFilter(mockReq, mockRes);
+	}
+
+	@Test
 	public void testHeaderPresentCustomHeaderConfig_goodRequest() throws ServletException, IOException {
 		// CSRF HAS been sent
 		HttpServletRequest mockReq = Mockito.mock(HttpServletRequest.class);

[atlas] 01/04: ATLAS-4292 : Atlas Debug Metrics- MessageException through debug metrics via browser

Posted by nb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nbonte pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 7e2e130901f9876d5448d1155714daceccc25872
Author: mayanknj <ma...@freestoneinfotech.com>
AuthorDate: Fri May 21 10:16:37 2021 +0530

    ATLAS-4292 : Atlas Debug Metrics- MessageException through debug metrics via browser
    
    Signed-off-by: Nikhil Bonte <nb...@apache.org>
---
 .../src/main/java/org/apache/atlas/web/resources/AdminResource.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index e4b6ea9..d124cd2 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -23,7 +23,6 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.authorize.AtlasAdminAccessRequest;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasEntityAccessRequest;
@@ -49,6 +48,7 @@ import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.metrics.AtlasMetrics;
 import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
+import org.apache.atlas.model.tasks.AtlasTask;
 import org.apache.atlas.repository.audit.AtlasAuditService;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
 import org.apache.atlas.repository.impexp.AtlasServerService;
@@ -57,7 +57,6 @@ import org.apache.atlas.repository.impexp.ExportService;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.MigrationProgressService;
 import org.apache.atlas.repository.impexp.ZipSink;
-import org.apache.atlas.model.tasks.AtlasTask;
 import org.apache.atlas.repository.patches.AtlasPatchManager;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetricsService;
@@ -773,6 +772,7 @@ public class AdminResource {
 
     @GET
     @Path("/debug/metrics")
+    @Produces(MediaType.APPLICATION_JSON)
     public Map getDebugMetrics() {
         return debugMetricsRESTSink.getMetrics();
     }

[atlas] 03/04: ATLAS-4259: Swagger: Improve Header validation

Posted by nb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nbonte pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 07037d2f5328a85be7e106cbbe82ffc91c3ea77d
Author: Nikhil Bonte <ni...@freestoneinfotech.com>
AuthorDate: Fri Apr 23 17:56:54 2021 +0530

    ATLAS-4259: Swagger: Improve Header validation
    
    Signed-off-by: Nikhil Bonte <nb...@apache.org>
---
 build-tools/src/main/resources/ui-dist/index.html |   1 +
 build-tools/src/main/resources/ui-dist/index.js   | 138 +++++++++++++++-------
 2 files changed, 96 insertions(+), 43 deletions(-)

diff --git a/build-tools/src/main/resources/ui-dist/index.html b/build-tools/src/main/resources/ui-dist/index.html
index e743572..653c38b 100755
--- a/build-tools/src/main/resources/ui-dist/index.html
+++ b/build-tools/src/main/resources/ui-dist/index.html
@@ -47,6 +47,7 @@
   <body>
     <div id="swagger-ui"></div>
 
+    <script src="../js/libs/jquery/js/jquery.min.js" charset="UTF-8"> </script>
     <script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
     <script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
     <script src="./index.js" charset="UTF-8"> </script>
diff --git a/build-tools/src/main/resources/ui-dist/index.js b/build-tools/src/main/resources/ui-dist/index.js
index 60a8477..3ae4076 100644
--- a/build-tools/src/main/resources/ui-dist/index.js
+++ b/build-tools/src/main/resources/ui-dist/index.js
@@ -16,47 +16,99 @@
  * limitations under the License.
  */
 
-var gatewayUrl;
-
-window.onload = function() {
-    const ui = SwaggerUIBundle({
-        url: getSwaggerBaseUrl(window.location.pathname) + "/swagger.json",
-        dom_id: '#swagger-ui',
-        deepLinking: true,
-        presets: [
-            SwaggerUIBundle.presets.apis,
-            SwaggerUIStandalonePreset
-        ],
-        plugins: [
-            SwaggerUIBundle.plugins.DownloadUrl
-        ],
-        layout: "StandaloneLayout",
-        requestInterceptor: function(request) {
-              if (!request.url.includes("swagger.json")) {
+(function () {
+    var gatewayUrl,
+    _csrfToken,
+    csrfEnabled = false,
+    restCsrfCustomHeader,
+    restCsrfMethodsToIgnore = [],
+    swaggerSpecFileName = "swagger.json";
+
+    window.onload = function() {
+        const ui = SwaggerUIBundle({
+            url: getSwaggerBaseUrl(window.location.pathname) + "/" + swaggerSpecFileName,
+            dom_id: '#swagger-ui',
+            deepLinking: true,
+            presets: [
+                SwaggerUIBundle.presets.apis,
+                SwaggerUIStandalonePreset
+            ],
+            plugins: [
+                SwaggerUIBundle.plugins.DownloadUrl
+            ],
+            layout: "StandaloneLayout",
+            requestInterceptor: function(request) {
+                if (!request.url.includes(swaggerSpecFileName)) {
                     request.url = getAPIUrl(request.url);
-              }
-              request.headers['X-XSRF-HEADER'] = "valid";
-              return request;
-        },
-        docExpansion: 'none',
-        validatorUrl: 'none'
-    })
-    window.ui = ui;
-
-    document.getElementById("swagger-ui").getElementsByClassName("topbar-wrapper")[0].getElementsByTagName("img")[0].src = gatewayUrl + "/img/atlas_logo.svg";
-}
-
-function getSwaggerBaseUrl(url) {
-    var path = url.replace(/\/[\w-]+.(jsp|html)|\/+$/ig, '');
-    splitPath = path.split("/");
-    splitPath.pop();
-    gatewayUrl = splitPath.join("/");
-
-    return window.location.origin + path;
-};
-
-function getAPIUrl(url) {
-    url = new URL(url);
-    var path =  url.origin + gatewayUrl + url.pathname + url.search;
-    return path;
-};
+                    setCsrfHeaderToRequest(request);
+                }
+
+                return request;
+            },
+            docExpansion: 'none',
+            validatorUrl: 'none'
+        })
+        window.ui = ui;
+
+        atlasLogo = gatewayUrl + "/img/atlas_logo.svg";
+        $('#swagger-ui img').attr("src", atlasLogo);
+
+        fetchCsrfHeader();
+    }
+
+    function getSwaggerBaseUrl(url) {
+        var path = url.replace(/\/[\w-]+.(jsp|html)|\/+$/ig, '');
+        splitPath = path.split("/");
+        splitPath.pop();
+        gatewayUrl = splitPath.join("/");
+
+        return window.location.origin + path;
+    };
+
+    function getAPIUrl(url) {
+        url = new URL(url);
+        var path =  url.origin + gatewayUrl + url.pathname + url.search;
+        return path;
+    };
+
+    function fetchCsrfHeader() {
+        var response = getSessionDetails();
+
+        if (!csrfEnabled && response['atlas.rest-csrf.enabled']) {
+            var str = "" + response['atlas.rest-csrf.enabled'];
+            csrfEnabled = (str.toLowerCase() == 'true');
+        }
+
+        if (!restCsrfCustomHeader && response["atlas.rest-csrf.custom-header"]) {
+            restCsrfCustomHeader = response["atlas.rest-csrf.custom-header"].trim();
+        }
+
+        if (restCsrfMethodsToIgnore == 0 && response["atlas.rest-csrf.methods-to-ignore"]) {
+            restCsrfMethodsToIgnore = response["atlas.rest-csrf.methods-to-ignore"].split(",");
+        }
+
+        if (csrfEnabled) {
+            _csrfToken = response['_csrfToken'];
+        }
+    }
+
+    function setCsrfHeaderToRequest(request) {
+        if (csrfEnabled && !restCsrfMethodsToIgnore.includes(request.method)) {
+           request.headers[restCsrfCustomHeader] = _csrfToken;
+        }
+    }
+
+    function getSessionDetails() {
+        var response;
+        $.ajax({
+            async : false,
+            method: "GET",
+            url: gatewayUrl + "/api/atlas/admin/session",
+            dataType: 'json',
+            success: function(result){
+                response = result;
+            }
+        });
+        return response;
+    };
+})();
\ No newline at end of file

[atlas] 04/04: ATLAS-4152: Entity correlation for deleted entities.

Posted by nb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nbonte pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit c3df22605795b9bddcba547852c04461ee9b8203
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu May 20 15:07:27 2021 -0700

    ATLAS-4152: Entity correlation for deleted entities.
    
    Signed-off-by: Nikhil Bonte <nb...@apache.org>
---
 .../org/apache/atlas/repository/Constants.java     |  2 +
 .../notification/AtlasNotificationMessage.java     | 23 ++++--
 .../org/apache/atlas/kafka/AtlasKafkaConsumer.java |  3 +-
 .../org/apache/atlas/kafka/AtlasKafkaMessage.java  | 17 ++++-
 .../org/apache/atlas/kafka/KafkaNotification.java  | 12 ++++
 .../AtlasNotificationMessageDeserializer.java      | 17 ++++-
 .../atlas/notification/NotificationInterface.java  |  9 +++
 .../atlas/notification/spool/AtlasFileSpool.java   | 30 ++++++--
 .../apache/atlas/notification/spool/Publisher.java | 40 +++++++++--
 .../notification/spool/SpoolConfiguration.java     | 16 +++++
 .../apache/atlas/notification/spool/Spooler.java   | 17 ++++-
 .../notification/AbstractNotificationTest.java     |  5 ++
 .../notification/spool/AtlasFileSpoolTest.java     |  5 ++
 .../repository/graph/GraphBackedSearchIndexer.java |  1 +
 .../store/graph/EntityCorrelationStore.java        | 53 ++++++++++++++
 .../store/graph/v2/AtlasGraphUtilsV2.java          | 21 ++++++
 .../store/graph/v2/EntityCorrelationStoreTest.java | 83 ++++++++++++++++++++++
 .../notification/EntityCorrelationManager.java     | 60 ++++++++++++++++
 .../notification/NotificationHookConsumer.java     | 15 +++-
 .../preprocessor/EntityPreprocessor.java           | 16 ++++-
 .../preprocessor/HiveDbDDLPreprocessor.java        | 52 ++++++++++++++
 .../preprocessor/HivePreprocessor.java             | 28 ++++++++
 .../preprocessor/HiveTableDDLPreprocessor.java     | 52 ++++++++++++++
 .../preprocessor/PreprocessorContext.java          | 17 ++++-
 .../NotificationHookConsumerKafkaTest.java         |  6 +-
 .../notification/NotificationHookConsumerTest.java | 22 +++---
 26 files changed, 579 insertions(+), 43 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index ffcec97..aea0c13 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -135,6 +135,8 @@ public final class Constants {
 
     public static final String TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "timestamp");
 
+    public static final String ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityDeletedTimestamp");
+
     public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
 
     public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
index 810ba97..5869910 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
@@ -40,9 +40,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)
 public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
-    private String msgSourceIP;
-    private String msgCreatedBy;
-    private long   msgCreationTime;
+    private String  msgSourceIP;
+    private String  msgCreatedBy;
+    private long    msgCreationTime;
+    private boolean spooled;
 
     /**
      * The actual message.
@@ -55,18 +56,22 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
     }
 
     public AtlasNotificationMessage(MessageVersion version, T message) {
-        this(version, message, null, null);
+        this(version, message, null, null, false);
     }
 
-    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
+    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled) {
         super(version);
 
         this.msgSourceIP     = msgSourceIP;
         this.msgCreatedBy    = createdBy;
         this.msgCreationTime = (new Date()).getTime();
         this.message         = message;
+        this.spooled         = spooled;
     }
 
+    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
+        this(version, message, msgSourceIP, createdBy, false);
+    }
 
     public String getMsgSourceIP() {
         return msgSourceIP;
@@ -99,4 +104,12 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
     public void setMessage(T message) {
         this.message = message;
     }
+
+    public void setSpooled(boolean val) {
+        this.spooled = val;
+    }
+
+    public boolean getSpooled() {
+        return spooled;
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index f7d9668..96dc585 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -134,7 +134,8 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
                     continue;
                 }
 
-                messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition()));
+                messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
+                                                            deserializer.getMsgCreated(), deserializer.getSpooled()));
             }
         }
 
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
index 22bd79f..af3727d 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -24,11 +24,19 @@ public class AtlasKafkaMessage<T> {
     private final T              message;
     private final long           offset;
     private final TopicPartition topicPartition;
+    private final boolean        spooled;
+    private final long           msgCreated;
 
-    public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
+    public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) {
         this.message        = message;
         this.offset         = offset;
         this.topicPartition = new TopicPartition(topic, partition);
+        this.msgCreated     = msgCreated;
+        this.spooled        = spooled;
+    }
+
+    public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
+        this(message, offset, topic, partition, 0, false);
     }
 
     public T getMessage() {
@@ -51,4 +59,11 @@ public class AtlasKafkaMessage<T> {
         return topicPartition.partition();
     }
 
+    public boolean getSpooled() {
+        return this.spooled;
+    }
+
+    public long getMsgCreated() {
+        return this.msgCreated;
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 3d1b3cc..32f5183 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -172,6 +172,18 @@ public class KafkaNotification extends AbstractNotification implements Service {
 
 
     // ----- NotificationInterface -------------------------------------------
+    public boolean isReady(NotificationType notificationType) {
+        try {
+            KafkaProducer producer = getOrCreateProducer(notificationType);
+            producer.metrics();
+            return true;
+        }
+        catch (Exception exception) {
+            LOG.error("Error: Connecting... {}", exception.getMessage());
+            return false;
+        }
+    }
+
     @Override
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
         return createConsumers(notificationType, numConsumers, Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false"))));
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 3264e26..b43bc7c 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -62,6 +62,8 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     private long                                      splitMessagesLastPurgeTime    = System.currentTimeMillis();
     private final AtomicLong                          messageCountTotal             = new AtomicLong(0);
     private final AtomicLong                          messageCountSinceLastInterval = new AtomicLong(0);
+    private long                                      msgCreated;
+    private boolean                                   spooled;
     // ----- Constructors ----------------------------------------------------
 
     /**
@@ -101,18 +103,31 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     }
 
     // ----- MessageDeserializer ---------------------------------------------
+    public long getMsgCreated() {
+        return this.msgCreated;
+    }
+
+    public boolean getSpooled() {
+        return this.spooled;
+    }
+
     @Override
     public T deserialize(String messageJson) {
         final T ret;
 
         messageCountTotal.incrementAndGet();
         messageCountSinceLastInterval.incrementAndGet();
+        this.msgCreated = 0;
+        this.spooled = false;
 
-        AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class);
+        AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class);
 
         if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
             ret = AtlasType.fromV1Json(messageJson, messageType);
         } else  {
+            this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime();
+            this.spooled = ((AtlasNotificationMessage) msg).getSpooled();
+
             String msgJson = messageJson;
 
             if (msg.getMsgSplitCount() > 1) { // multi-part message
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index edd8ed9..3d8d9cc 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -110,4 +110,13 @@ public interface NotificationInterface {
      * Shutdown any notification producers and consumers associated with this interface instance.
      */
     void close();
+
+    /**
+     *  Check if underlying notification mechanism is ready for use.
+     *
+     * @param type tye message type
+     * @return true if available, false otherwise
+     *
+     */
+    boolean isReady(NotificationType type);
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index 2d7d195..ea31284 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -27,6 +27,7 @@ import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -51,7 +52,7 @@ public class AtlasFileSpool implements NotificationInterface {
 
     @Override
     public void init(String source, Object failedMessagesLogger) {
-        LOG.info("==> AtlasFileSpool.init(source={})", source);
+        LOG.debug("==> AtlasFileSpool.init(source={})", source);
 
         if (!isInitDone()) {
             try {
@@ -76,10 +77,10 @@ public class AtlasFileSpool implements NotificationInterface {
                 LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t);
             }
         } else {
-            LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
+            LOG.debug("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
         }
 
-        LOG.info("<== AtlasFileSpool.init(source={})", source);
+        LOG.debug("<== AtlasFileSpool.init(source={})", source);
     }
 
     @Override
@@ -100,29 +101,35 @@ public class AtlasFileSpool implements NotificationInterface {
     }
 
     @Override
+    public boolean isReady(NotificationType type) {
+        return true;
+    }
+
+    @Override
     public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        List<String> serializedMessages = getSerializedMessages(messages);
         if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("AtlasFileSpool.send(): sending to spooler");
             }
 
-            spooler.send(type, messages);
+            spooler.sendInternal(type, serializedMessages);
         } else {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
             }
 
             try {
-                notificationHandler.send(type, messages);
+                notificationHandler.sendInternal(type, serializedMessages);
             } catch (Exception e) {
                 if (isInitDone()) {
                     LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e);
 
                     publisher.setDestinationDown();
 
-                    spooler.send(type, messages);
+                    spooler.sendInternal(type, serializedMessages);
                 } else {
-                    LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e);
+                    LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not initialized.", e);
 
                     throw e;
                 }
@@ -130,6 +137,15 @@ public class AtlasFileSpool implements NotificationInterface {
         }
     }
 
+    private <T> List<String> getSerializedMessages(List<T> messages) {
+        List<String> serializedMessages = new ArrayList<>(messages.size());
+        for (int index = 0; index < messages.size(); index++) {
+            notificationHandler.createNotificationMessages(messages.get(index), serializedMessages);
+        }
+
+        return serializedMessages;
+    }
+
     @Override
     public void close() {
         try {
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
index 22242c9..01ead7d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
@@ -65,14 +65,16 @@ public class Publisher implements Runnable {
             IndexRecord record = null;
 
             while (true) {
-                waitIfDestinationDown();
-
+                checkAndWaitIfDestinationDown();
                 if (this.isDrain) {
                     break;
                 }
 
-                record = fetchNext(record);
+                if (isDestDown) {
+                    continue;
+                }
 
+                record = fetchNext(record);
                 if (record != null && processAndDispatch(record)) {
                     indexManagement.removeAsDone(record);
 
@@ -104,14 +106,14 @@ public class Publisher implements Runnable {
         return isDestDown;
     }
 
-    private void waitIfDestinationDown() throws InterruptedException {
+    private void checkAndWaitIfDestinationDown() throws InterruptedException {
+        isDestDown = !notificationHandler.isReady(NotificationInterface.NotificationType.HOOK);
         if (isDestDown) {
             LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items",
                      this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize());
 
             Thread.sleep(retryDestinationMS);
         }
-
     }
 
     private IndexRecord fetchNext(IndexRecord record) {
@@ -147,7 +149,7 @@ public class Publisher implements Runnable {
 
                     messages.add(message);
 
-                    if ((isDestDown && messages.size() == 1) || messages.size() == messageBatchSize) {
+                    if (messages.size() == messageBatchSize) {
                         dispatch(record, lineInSpoolFile, messages);
                     }
                 }
@@ -192,6 +194,8 @@ public class Publisher implements Runnable {
 
     private void dispatch(String filePath, List<String> messages) throws Exception {
         try {
+            pauseBeforeSend();
+
             notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages);
 
             if (isDestDown) {
@@ -207,4 +211,28 @@ public class Publisher implements Runnable {
             messages.clear();
         }
     }
+
+    /**
+     * Reason for pauseBeforeSend:
+     *  - EntityCorrelation is needed to be able to stitch lineage to the correct entity.
+     *  - Background: When messages are added to Kafka queue directly, the ordering is incidentally guaranteed, where
+     *     messages from lineage producing hooks reach immediately after messages from entities producing hooks.
+     *  - When Spooled messages are posted onto Kafka, this order cannot be guaranteed. The entity correlation logic within Atlas
+     *     can attach lineage to the correct entity, provided that the entity participating in the lineage is already present.
+     *
+     *   This logic of entity correlation works well for majority of cases except where lineage entities are created before regular entities.
+     *   In this case, shell entities get created in the absence of real entities. Problem is that there is 1 shell entity for any number of references.
+     *   Circumventing this limitation is not easy.
+     *
+     *   The pauseBeforeSend forces the situation where HiveMetaStore generated messages reach Kafka before lineage-producing hooks.
+     *
+     * @throws InterruptedException
+     */
+    private void pauseBeforeSend() throws InterruptedException {
+        if (!configuration.isHiveMetaStore()) {
+            int waitMs = configuration.getPauseBeforeSendSec() * 1000;
+            LOG.info("Waiting before dispatch: {}", waitMs);
+            Thread.sleep(waitMs);
+        }
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
index a9a3a78..74b8687 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -25,9 +25,11 @@ public class SpoolConfiguration {
     private static final int    PROP_RETRY_DESTINATION_MS_DEFAULT               = 30000; // Default 30 seconds
     private static final int    PROP_FILE_ROLLOVER_SEC_DEFAULT                  = 60; // 60 secs
     private static final int    PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100;
+    private static final int    PROP_PAUSE_BEFORE_SEND_MS_DEFAULT               = 60;
     private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT             = "archive";
     private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT               = "/tmp/spool";
     private static final int    PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT            = 100;
+    private static final String PROP_HIVE_METASTORE_NAME_DEFAULT                = "HiveMetastoreHookImpl";
     private static final String PROPERTY_PREFIX_SPOOL                           = "atlas.hook.spool.";
     public  static final String PROP_FILE_SPOOL_LOCAL_DIR                       = PROPERTY_PREFIX_SPOOL + "dir";
     private static final String PROP_FILE_SPOOL_ARCHIVE_DIR                     = PROPERTY_PREFIX_SPOOL + "archive.dir";
@@ -35,6 +37,8 @@ public class SpoolConfiguration {
     public  static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC               = PROPERTY_PREFIX_SPOOL + "file.rollover.sec";
     public  static final String PROP_FILE_SPOOL_DEST_RETRY_MS                   = PROPERTY_PREFIX_SPOOL + "destination.retry.ms";
     private static final String PROP_MESSAGE_BATCH_SIZE                         = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize";
+    private static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC           = PROPERTY_PREFIX_SPOOL + "pause.before.send.sec";
+    private static final String PROP_HIVE_METASTORE_NAME                        = PROPERTY_PREFIX_SPOOL + "hivemetastore.name";
 
     private final String messageHandlerName;
     private final int    maxArchivedFilesCount;
@@ -44,6 +48,8 @@ public class SpoolConfiguration {
     private final int    fileSpoolMaxFilesCount;
     private final String spoolDirPath;
     private final String archiveDir;
+    private final int    pauseBeforeSendSec;
+    private final String hiveMetaStoreName;
     private       String sourceName;
 
     public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
@@ -51,10 +57,12 @@ public class SpoolConfiguration {
         this.maxArchivedFilesCount  = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
         this.messageBatchSize       = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
         this.retryDestinationMS     = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT);
+        this.pauseBeforeSendSec     = cfg.getInt(PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC, PROP_PAUSE_BEFORE_SEND_MS_DEFAULT);
         this.fileRollOverSec        = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000;
         this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
         this.spoolDirPath           = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT);
         this.archiveDir             = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
+        this.hiveMetaStoreName      = cfg.getString(PROP_HIVE_METASTORE_NAME, PROP_HIVE_METASTORE_NAME_DEFAULT);
     }
 
     public void setSource(String val) {
@@ -120,4 +128,12 @@ public class SpoolConfiguration {
 
         return new File(getSpoolDir(), fileDoneName);
     }
+
+    public int getPauseBeforeSendSec() {
+        return pauseBeforeSendSec;
+    }
+
+    public boolean isHiveMetaStore() {
+        return this.sourceName.equals(this.hiveMetaStoreName);
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
index 2cacaaa..a918e9b 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
@@ -19,14 +19,14 @@ package org.apache.atlas.notification.spool;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.commons.io.IOUtils;
+import org.apache.atlas.type.AtlasType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.DataOutput;
-import java.io.PrintWriter;
 import java.util.List;
 
 public class Spooler extends AbstractNotification {
@@ -57,8 +57,14 @@ public class Spooler extends AbstractNotification {
 
     @Override
     public void sendInternal(NotificationType type, List<String> messages) {
-        boolean ret = write(messages);
+        for (int i = 0; i < messages.size(); i++) {
+            AtlasNotificationMessage e = AtlasType.fromV1Json(messages.get(i), AtlasNotificationMessage.class);
+            e.setSpooled(true);
+
+            messages.set(i, AtlasType.toV1Json(e));
+        }
 
+        boolean ret = write(messages);
         if (failedMessagesLogger != null && !ret) {
             writeToFailedMessages(messages);
         }
@@ -68,6 +74,11 @@ public class Spooler extends AbstractNotification {
     public void close() {
     }
 
+    @Override
+    public boolean isReady(NotificationType type) {
+        return true;
+    }
+
     @VisibleForTesting
     boolean write(List<String> messages) {
         final boolean ret;
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index d7e4959..8078a6c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -127,5 +127,10 @@ public class AbstractNotificationTest {
         @Override
         public void close() {
         }
+
+        @Override
+        public boolean isReady(NotificationType type) {
+            return true;
+        }
     }
 }
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
index 167efbe..265598e 100644
--- a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
@@ -81,6 +81,11 @@ public class AtlasFileSpoolTest extends BaseTest {
         public void close() {
 
         }
+
+        @Override
+        public boolean isReady(NotificationType type) {
+            return true;
+        }
     }
 
     @Test
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index cc727c6..ddfb008 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -339,6 +339,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
             createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
+            createCommonVertexIndex(management, ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, UniqueKind.NONE, Long.class, SINGLE, true, false);
 
             createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java
new file mode 100644
index 0000000..4760757
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph;
+
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class EntityCorrelationStore {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationStore.class);
+
+    public EntityCorrelationStore() {
+    }
+
+    @GraphTransaction
+    public void add(String entityGuid, long messageTimestamp) {
+        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(entityGuid);
+        if (v == null) {
+            LOG.warn("Fetching: {} did not yield result!", entityGuid);
+            return;
+        }
+
+        AtlasGraphUtilsV2.setEncodedProperty(v, Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, messageTimestamp);
+        LOG.info("Updating: {}: {}", entityGuid, messageTimestamp);
+    }
+
+    public String findCorrelatedGuid(String qualifiedName, long messageTimestamp) {
+        String guid = AtlasGraphUtilsV2.findFirstDeletedDuringSpooledByQualifiedName(qualifiedName, messageTimestamp);
+
+        LOG.info("findCorrelatedGuid: {} - {} -> {}", qualifiedName, messageTimestamp, guid);
+        return guid;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 0a94708..e73f084 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -367,6 +367,27 @@ public class AtlasGraphUtilsV2 {
         return vertex;
     }
 
+    public static String findFirstDeletedDuringSpooledByQualifiedName(String qualifiedName, long timestamp) {
+        return findFirstDeletedDuringSpooledByQualifiedName(getGraphInstance(), qualifiedName, timestamp);
+    }
+
+    public static String findFirstDeletedDuringSpooledByQualifiedName(AtlasGraph graph, String qualifiedName, long timestamp) {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("findDeletedDuringSpooledByQualifiedName");
+
+        AtlasGraphQuery query = graph.query().has(STATE_PROPERTY_KEY, Status.DELETED.name())
+                                             .has(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.GREATER_THAN, timestamp)
+                                             .has(Constants.QUALIFIED_NAME, qualifiedName)
+                                             .orderBy(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, ASC);
+
+        Iterator iterator = query.vertices().iterator();
+
+        String ret = iterator.hasNext() ? GraphHelper.getGuid((AtlasVertex) iterator.next()) : null;
+
+        RequestContext.get().endMetricRecord(metric);
+
+        return ret;
+    }
+
     public static AtlasVertex findByGuid(String guid) {
         return findByGuid(getGraphInstance(), guid);
     }
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java
new file mode 100644
index 0000000..a3be5f4
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.BasicTestSetup;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class EntityCorrelationStoreTest extends BasicTestSetup {
+    @Inject
+    AtlasGraph graph;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.initialize();
+
+        setupTestData();
+    }
+
+    @Test
+    public void verify() throws IOException, AtlasBaseException {
+        final String nonExistentQName = "db01@cm";
+        final String db01QName = "db01x@cm";
+        final EntityCorrelationStore entityCorrelationStore = new EntityCorrelationStore();
+
+        String db01 = TestResourceFileUtils.getJson("entities", "db01");
+
+        AtlasEntity.AtlasEntitiesWithExtInfo db = AtlasType.fromJson(db01, AtlasEntity.AtlasEntitiesWithExtInfo.class);
+        EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(db), false);
+
+        String dbGuid = response.getFirstEntityCreated().getGuid();
+        entityStore.deleteById(dbGuid);
+
+        entityCorrelationStore.add(dbGuid,2L);
+        graph.commit();
+
+        String guid = entityCorrelationStore.findCorrelatedGuid(nonExistentQName, 1);
+        assertNull(guid);
+
+        String fetchedGuid = entityCorrelationStore.findCorrelatedGuid(db01QName, 1L);
+        assertNotNull(fetchedGuid);
+        assertEquals(fetchedGuid, dbGuid);
+
+        guid = entityCorrelationStore.findCorrelatedGuid(db01QName, 2L);
+        assertNull(guid);
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
new file mode 100644
index 0000000..f1d6aff
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class EntityCorrelationManager {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationManager.class);
+
+    private final EntityCorrelationStore entityCorrelationStore;
+
+    public EntityCorrelationManager(EntityCorrelationStore entityCorrelationStore) {
+        this.entityCorrelationStore = entityCorrelationStore;
+    }
+
+    public void add(boolean spooled, long spooledTimestamp, List<AtlasEntityHeader> entityHeaders) {
+        if (this.entityCorrelationStore == null || spooled == false || CollectionUtils.isEmpty(entityHeaders)) {
+            return;
+        }
+
+        for (AtlasEntityHeader entityHeader : entityHeaders) {
+            String guid = entityHeader.getGuid();
+            if (StringUtils.isNotEmpty(guid)) {
+                entityCorrelationStore.add(guid, spooledTimestamp);
+            }
+        }
+    }
+
+    public String getGuidForDeletedEntityToBeCorrelated(String qualifiedName, long spooledMessageTimestamp) {
+        if (this.entityCorrelationStore == null || spooledMessageTimestamp <= 0) {
+            return null;
+        }
+
+        String guid = entityCorrelationStore.findCorrelatedGuid(qualifiedName, spooledMessageTimestamp);
+        LOG.info("{}: spooledTimestamp: {} -> {}", qualifiedName, spooledMessageTimestamp, guid);
+        return guid;
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 84cc8d8..5643af9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -40,6 +40,7 @@ import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
 import org.apache.atlas.util.AtlasMetricsCounter;
 import org.apache.atlas.utils.AtlasJson;
 import org.apache.atlas.utils.LruCache;
@@ -191,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private       ExecutorService               executors;
     private       Instant                       nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now());
     private final Map<TopicPartition, Long>     lastCommittedPartitionOffset;
+    private final EntityCorrelationManager      entityCorrelationManager;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -201,7 +203,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     @Inject
     public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
                                     ServiceState serviceState, AtlasInstanceConverter instanceConverter,
-                                    AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) throws AtlasException {
+                                    AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil,
+                                    EntityCorrelationStore entityCorrelationStore) throws AtlasException {
         this.notificationInterface = notificationInterface;
         this.atlasEntityStore      = atlasEntityStore;
         this.serviceState          = serviceState;
@@ -308,7 +311,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         hiveTypesRemoveOwnedRefAttrs  = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
         rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
         preprocessEnabled             = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
-
+        entityCorrelationManager      = new EntityCorrelationManager(entityCorrelationStore);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
         LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
@@ -688,6 +691,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                     EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
 
                                     stats.updateStats(response);
+                                    entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to delete entity {}", deleteRequest);
                                 }
@@ -770,6 +774,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                         EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
 
                                         stats.updateStats(response);
+                                        entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
                                     }
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to do delete entities {}", entities);
@@ -889,6 +894,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                     RequestContext.get().resetEntityGuidUpdates();
 
+                    entityCorrelationManager.add(context.isSpooledMessage(), context.getMsgCreated(), response.getDeletedEntities());
+
                     RequestContext.get().clearCache();
                 }
             }
@@ -973,7 +980,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         PreprocessorContext context = null;
 
         if (preprocessEnabled) {
-            context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName);
+            context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
+                    hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
+                    rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
 
             if (context.isHivePreprocessEnabled()) {
                 preprocessHiveTypes(context);
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 89568e2..7f0cafe 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -32,6 +32,8 @@ public abstract class EntityPreprocessor {
     public static final String TYPE_HIVE_PROCESS        = "hive_process";
     public static final String TYPE_HIVE_STORAGEDESC    = "hive_storagedesc";
     public static final String TYPE_HIVE_DB             = "hive_db";
+    public static final String TYPE_HIVE_DB_DDL         = "hive_db_ddl";
+    public static final String TYPE_HIVE_TABLE_DDL      = "hive_table_ddl";
     public static final String TYPE_HIVE_TABLE          = "hive_table";
     public static final String TYPE_RDBMS_INSTANCE      = "rdbms_instance";
     public static final String TYPE_RDBMS_DB            = "rdbms_db";
@@ -71,11 +73,13 @@ public abstract class EntityPreprocessor {
     static {
         EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
                                                                     new HivePreprocessor.HiveDbPreprocessor(),
+                                                                    new HiveDbDDLPreprocessor(),
                                                                     new HivePreprocessor.HiveTablePreprocessor(),
                                                                     new HivePreprocessor.HiveColumnPreprocessor(),
                                                                     new HivePreprocessor.HiveProcessPreprocessor(),
                                                                     new HivePreprocessor.HiveColumnLineageProcessPreprocessor(),
-                                                                    new HivePreprocessor.HiveStorageDescPreprocessor()
+                                                                    new HivePreprocessor.HiveStorageDescPreprocessor(),
+                                                                    new HiveTableDDLPreprocessor()
         };
 
         EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] {
@@ -158,6 +162,16 @@ public abstract class EntityPreprocessor {
         return ret != null ? ret.toString() : null;
     }
 
+    public void setObjectIdWithGuid(Object obj, String guid) {
+        if (obj instanceof AtlasObjectId) {
+            AtlasObjectId objectId = (AtlasObjectId) obj;
+            objectId.setGuid(guid);
+        } else if (obj instanceof Map) {
+            Map map = (Map) obj;
+            map.put("guid", guid);
+        }
+    }
+
     protected boolean isEmpty(Object obj) {
         return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty());
     }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
new file mode 100644
index 0000000..dcff093
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveDbDDLPreprocessor extends EntityPreprocessor {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveDbDDLPreprocessor.class);
+
+    protected HiveDbDDLPreprocessor() {
+        super(TYPE_HIVE_DB_DDL);
+    }
+
+    @Override
+    public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+        if (!context.isSpooledMessage()) {
+            return;
+        }
+
+        Object dbObject = entity.getRelationshipAttribute(ATTRIBUTE_DB);
+        if (dbObject == null) {
+            return;
+        }
+
+        String qualifiedName = getQualifiedName(dbObject);
+        String guid = context.getGuidForDeletedEntity(qualifiedName);
+        if (StringUtils.isEmpty(guid)) {
+            return;
+        }
+
+        setObjectIdWithGuid(dbObject, guid);
+        LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 86e3384..bf6a623 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -228,6 +228,34 @@ public class HivePreprocessor {
                     }
                 }
             }
+
+            preprocessCheckpoint(entity, context);
+        }
+
+        private void preprocessCheckpoint(AtlasEntity entity, PreprocessorContext context) {
+            if (!context.isSpooledMessage()) {
+                return;
+            }
+
+            String[] relationshipNames = new String[]{ATTRIBUTE_INPUTS, ATTRIBUTE_OUTPUTS};
+            for (String relationshipName : relationshipNames) {
+                Object val = entity.getRelationshipAttribute(relationshipName);
+                if (!isEmpty(val) && val instanceof List) {
+                    updateListWithGuids(context, (List) val);
+                }
+            }
+        }
+
+        private void updateListWithGuids(PreprocessorContext context, List list) {
+            for (Object o : list) {
+                String qn = getQualifiedName(o);
+                String guid = context.getGuidForDeletedEntity(qn);
+                if (StringUtils.isEmpty(guid)) {
+                    continue;
+                }
+
+                setObjectIdWithGuid(o, guid);
+            }
         }
 
         private int getCollectionSize(Object obj) {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
new file mode 100644
index 0000000..83d4d7c
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveTableDDLPreprocessor extends EntityPreprocessor {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveTableDDLPreprocessor.class);
+
+    protected HiveTableDDLPreprocessor() {
+        super(TYPE_HIVE_TABLE_DDL);
+    }
+
+    @Override
+    public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+        if (!context.isSpooledMessage()) {
+            return;
+        }
+
+        Object tableObject = entity.getRelationshipAttribute(ATTRIBUTE_TABLE);
+        if (tableObject == null) {
+            return;
+        }
+
+        String qualifiedName = getQualifiedName(tableObject);
+        String guid = context.getGuidForDeletedEntity(qualifiedName);
+        if (StringUtils.isEmpty(guid)) {
+            return;
+        }
+
+        setObjectIdWithGuid(tableObject, guid);
+        LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 608b4a3..59f6440 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.EntityCorrelationManager;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.collections.CollectionUtils;
@@ -69,9 +70,10 @@ public class PreprocessorContext {
     private final Set<String>                         createdEntities        = new HashSet<>();
     private final Set<String>                         deletedEntities        = new HashSet<>();
     private final Map<String, String>                 guidAssignments        = new HashMap<>();
+    private final EntityCorrelationManager            correlationManager;
     private       List<AtlasEntity>                   postUpdateEntities     = null;
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName) {
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationMana [...]
         this.kafkaMessage                           = kafkaMessage;
         this.typeRegistry                           = typeRegistry;
         this.hiveTablesToIgnore                     = hiveTablesToIgnore;
@@ -101,6 +103,7 @@ public class PreprocessorContext {
         }
 
         this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
+        this.correlationManager = correlationManager;
     }
 
     public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
@@ -577,4 +580,16 @@ public class PreprocessorContext {
             partialEntity.setAttribute(attrName, attrVal);
         }
     }
+
+    public long getMsgCreated() {
+        return kafkaMessage.getMsgCreated();
+    }
+
+    public boolean isSpooledMessage() {
+        return kafkaMessage.getSpooled();
+    }
+
+    public String getGuidForDeletedEntity(String qualifiedName) {
+        return this.correlationManager.getGuidForDeletedEntityToBeCorrelated(qualifiedName, kafkaMessage.getMsgCreated());
+    }
 }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 65e8b50..fdfc256 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -109,7 +109,7 @@ public class NotificationHookConsumerKafkaTest {
         produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
 
         NotificationConsumer<HookNotification> consumer                 = createNewConsumer(kafkaNotification, false);
-        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
         consumeOneMessage(consumer, hookConsumer);
@@ -128,7 +128,7 @@ public class NotificationHookConsumerKafkaTest {
     public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
 
         ExceptionThrowingCommitConsumer        consumer                 = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
-        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
         produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
@@ -159,7 +159,7 @@ public class NotificationHookConsumerKafkaTest {
 
         assertNotNull (consumer);
 
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
         consumeOneMessage(consumer, hookConsumer);
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 15a1900..f440c42 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -98,7 +98,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerCanProceedIfServerIsReady() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
@@ -111,7 +111,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
@@ -128,7 +128,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
-        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationConsumer                   consumer                 = mock(NotificationConsumer.class);
         NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
         EntityCreateRequest                    message                  = mock(EntityCreateRequest.class);
@@ -145,7 +145,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationConsumer                  consumer                 = mock(NotificationConsumer.class);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
         EntityCreateRequest                   message                  = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
@@ -159,7 +159,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
@@ -179,7 +179,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
         notificationHookConsumer.startInternal(configuration, executorService);
 
         verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
@@ -197,7 +197,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         notificationHookConsumer.startInternal(configuration, executorService);
 
@@ -216,7 +216,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsActive();
@@ -236,7 +236,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         doAnswer(new Answer() {
             @Override
@@ -267,7 +267,7 @@ public class NotificationHookConsumerTest {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
@@ -332,6 +332,6 @@ public class NotificationHookConsumerTest {
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
         when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-        return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+        return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
     }
 }