You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by nb...@apache.org on 2021/05/21 11:11:38 UTC
[atlas] 04/04: ATLAS-4152: Entity correlation for deleted entities.
This is an automated email from the ASF dual-hosted git repository.
nbonte pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit c3df22605795b9bddcba547852c04461ee9b8203
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu May 20 15:07:27 2021 -0700
ATLAS-4152: Entity correlation for deleted entities.
Signed-off-by: Nikhil Bonte <nb...@apache.org>
---
.../org/apache/atlas/repository/Constants.java | 2 +
.../notification/AtlasNotificationMessage.java | 23 ++++--
.../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 3 +-
.../org/apache/atlas/kafka/AtlasKafkaMessage.java | 17 ++++-
.../org/apache/atlas/kafka/KafkaNotification.java | 12 ++++
.../AtlasNotificationMessageDeserializer.java | 17 ++++-
.../atlas/notification/NotificationInterface.java | 9 +++
.../atlas/notification/spool/AtlasFileSpool.java | 30 ++++++--
.../apache/atlas/notification/spool/Publisher.java | 40 +++++++++--
.../notification/spool/SpoolConfiguration.java | 16 +++++
.../apache/atlas/notification/spool/Spooler.java | 17 ++++-
.../notification/AbstractNotificationTest.java | 5 ++
.../notification/spool/AtlasFileSpoolTest.java | 5 ++
.../repository/graph/GraphBackedSearchIndexer.java | 1 +
.../store/graph/EntityCorrelationStore.java | 53 ++++++++++++++
.../store/graph/v2/AtlasGraphUtilsV2.java | 21 ++++++
.../store/graph/v2/EntityCorrelationStoreTest.java | 83 ++++++++++++++++++++++
.../notification/EntityCorrelationManager.java | 60 ++++++++++++++++
.../notification/NotificationHookConsumer.java | 15 +++-
.../preprocessor/EntityPreprocessor.java | 16 ++++-
.../preprocessor/HiveDbDDLPreprocessor.java | 52 ++++++++++++++
.../preprocessor/HivePreprocessor.java | 28 ++++++++
.../preprocessor/HiveTableDDLPreprocessor.java | 52 ++++++++++++++
.../preprocessor/PreprocessorContext.java | 17 ++++-
.../NotificationHookConsumerKafkaTest.java | 6 +-
.../notification/NotificationHookConsumerTest.java | 22 +++---
26 files changed, 579 insertions(+), 43 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index ffcec97..aea0c13 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -135,6 +135,8 @@ public final class Constants {
public static final String TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "timestamp");
+ public static final String ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityDeletedTimestamp");
+
public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
index 810ba97..5869910 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
@@ -40,9 +40,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
- private String msgSourceIP;
- private String msgCreatedBy;
- private long msgCreationTime;
+ private String msgSourceIP;
+ private String msgCreatedBy;
+ private long msgCreationTime;
+ private boolean spooled;
/**
* The actual message.
@@ -55,18 +56,22 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
}
public AtlasNotificationMessage(MessageVersion version, T message) {
- this(version, message, null, null);
+ this(version, message, null, null, false);
}
- public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
+ public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled) {
super(version);
this.msgSourceIP = msgSourceIP;
this.msgCreatedBy = createdBy;
this.msgCreationTime = (new Date()).getTime();
this.message = message;
+ this.spooled = spooled;
}
+ public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
+ this(version, message, msgSourceIP, createdBy, false);
+ }
public String getMsgSourceIP() {
return msgSourceIP;
@@ -99,4 +104,12 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
public void setMessage(T message) {
this.message = message;
}
+
+ public void setSpooled(boolean val) {
+ this.spooled = val;
+ }
+
+ public boolean getSpooled() {
+ return spooled;
+ }
}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index f7d9668..96dc585 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -134,7 +134,8 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
continue;
}
- messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition()));
+ messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
+ deserializer.getMsgCreated(), deserializer.getSpooled()));
}
}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
index 22bd79f..af3727d 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -24,11 +24,19 @@ public class AtlasKafkaMessage<T> {
private final T message;
private final long offset;
private final TopicPartition topicPartition;
+ private final boolean spooled;
+ private final long msgCreated;
- public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
+ public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) {
this.message = message;
this.offset = offset;
this.topicPartition = new TopicPartition(topic, partition);
+ this.msgCreated = msgCreated;
+ this.spooled = spooled;
+ }
+
+ public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
+ this(message, offset, topic, partition, 0, false);
}
public T getMessage() {
@@ -51,4 +59,11 @@ public class AtlasKafkaMessage<T> {
return topicPartition.partition();
}
+ public boolean getSpooled() {
+ return this.spooled;
+ }
+
+ public long getMsgCreated() {
+ return this.msgCreated;
+ }
}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 3d1b3cc..32f5183 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -172,6 +172,18 @@ public class KafkaNotification extends AbstractNotification implements Service {
// ----- NotificationInterface -------------------------------------------
+ public boolean isReady(NotificationType notificationType) {
+ try {
+ KafkaProducer producer = getOrCreateProducer(notificationType);
+ producer.metrics();
+ return true;
+ }
+ catch (Exception exception) {
+ LOG.error("Error: Connecting... {}", exception.getMessage());
+ return false;
+ }
+ }
+
@Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
return createConsumers(notificationType, numConsumers, Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false"))));
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 3264e26..b43bc7c 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -62,6 +62,8 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
private long splitMessagesLastPurgeTime = System.currentTimeMillis();
private final AtomicLong messageCountTotal = new AtomicLong(0);
private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0);
+ private long msgCreated;
+ private boolean spooled;
// ----- Constructors ----------------------------------------------------
/**
@@ -101,18 +103,31 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
}
// ----- MessageDeserializer ---------------------------------------------
+ public long getMsgCreated() {
+ return this.msgCreated;
+ }
+
+ public boolean getSpooled() {
+ return this.spooled;
+ }
+
@Override
public T deserialize(String messageJson) {
final T ret;
messageCountTotal.incrementAndGet();
messageCountSinceLastInterval.incrementAndGet();
+ this.msgCreated = 0;
+ this.spooled = false;
- AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class);
+ AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class);
if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
ret = AtlasType.fromV1Json(messageJson, messageType);
} else {
+ this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime();
+ this.spooled = ((AtlasNotificationMessage) msg).getSpooled();
+
String msgJson = messageJson;
if (msg.getMsgSplitCount() > 1) { // multi-part message
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index edd8ed9..3d8d9cc 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -110,4 +110,13 @@ public interface NotificationInterface {
* Shutdown any notification producers and consumers associated with this interface instance.
*/
void close();
+
+ /**
+ * Check if underlying notification mechanism is ready for use.
+ *
+ * @param type tye message type
+ * @return true if available, false otherwise
+ *
+ */
+ boolean isReady(NotificationType type);
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index 2d7d195..ea31284 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -27,6 +27,7 @@ import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -51,7 +52,7 @@ public class AtlasFileSpool implements NotificationInterface {
@Override
public void init(String source, Object failedMessagesLogger) {
- LOG.info("==> AtlasFileSpool.init(source={})", source);
+ LOG.debug("==> AtlasFileSpool.init(source={})", source);
if (!isInitDone()) {
try {
@@ -76,10 +77,10 @@ public class AtlasFileSpool implements NotificationInterface {
LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t);
}
} else {
- LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
+ LOG.debug("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
}
- LOG.info("<== AtlasFileSpool.init(source={})", source);
+ LOG.debug("<== AtlasFileSpool.init(source={})", source);
}
@Override
@@ -100,29 +101,35 @@ public class AtlasFileSpool implements NotificationInterface {
}
@Override
+ public boolean isReady(NotificationType type) {
+ return true;
+ }
+
+ @Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+ List<String> serializedMessages = getSerializedMessages(messages);
if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
if (LOG.isDebugEnabled()) {
LOG.debug("AtlasFileSpool.send(): sending to spooler");
}
- spooler.send(type, messages);
+ spooler.sendInternal(type, serializedMessages);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
}
try {
- notificationHandler.send(type, messages);
+ notificationHandler.sendInternal(type, serializedMessages);
} catch (Exception e) {
if (isInitDone()) {
LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e);
publisher.setDestinationDown();
- spooler.send(type, messages);
+ spooler.sendInternal(type, serializedMessages);
} else {
- LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e);
+ LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not initialized.", e);
throw e;
}
@@ -130,6 +137,15 @@ public class AtlasFileSpool implements NotificationInterface {
}
}
+ private <T> List<String> getSerializedMessages(List<T> messages) {
+ List<String> serializedMessages = new ArrayList<>(messages.size());
+ for (int index = 0; index < messages.size(); index++) {
+ notificationHandler.createNotificationMessages(messages.get(index), serializedMessages);
+ }
+
+ return serializedMessages;
+ }
+
@Override
public void close() {
try {
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
index 22242c9..01ead7d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
@@ -65,14 +65,16 @@ public class Publisher implements Runnable {
IndexRecord record = null;
while (true) {
- waitIfDestinationDown();
-
+ checkAndWaitIfDestinationDown();
if (this.isDrain) {
break;
}
- record = fetchNext(record);
+ if (isDestDown) {
+ continue;
+ }
+ record = fetchNext(record);
if (record != null && processAndDispatch(record)) {
indexManagement.removeAsDone(record);
@@ -104,14 +106,14 @@ public class Publisher implements Runnable {
return isDestDown;
}
- private void waitIfDestinationDown() throws InterruptedException {
+ private void checkAndWaitIfDestinationDown() throws InterruptedException {
+ isDestDown = !notificationHandler.isReady(NotificationInterface.NotificationType.HOOK);
if (isDestDown) {
LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items",
this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize());
Thread.sleep(retryDestinationMS);
}
-
}
private IndexRecord fetchNext(IndexRecord record) {
@@ -147,7 +149,7 @@ public class Publisher implements Runnable {
messages.add(message);
- if ((isDestDown && messages.size() == 1) || messages.size() == messageBatchSize) {
+ if (messages.size() == messageBatchSize) {
dispatch(record, lineInSpoolFile, messages);
}
}
@@ -192,6 +194,8 @@ public class Publisher implements Runnable {
private void dispatch(String filePath, List<String> messages) throws Exception {
try {
+ pauseBeforeSend();
+
notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages);
if (isDestDown) {
@@ -207,4 +211,28 @@ public class Publisher implements Runnable {
messages.clear();
}
}
+
+ /**
+ * Reason for pauseBeforeSend:
+ * - EntityCorrelation is needed to be able to stitch lineage to the correct entity.
+ * - Background: When messages are added to Kafka queue directly, the ordering is incidentally guaranteed, where
+ * messages from lineage producing hooks reach immediately after messages from entities producing hooks.
+ * - When Spooled messages are posted onto Kafka, this order cannot be guaranteed. The entity correlation logic within Atlas
+ * can attach lineage to the correct entity, provided that the entity participating in the lineage is already present.
+ *
+ * This logic of entity correlation works well for majority of cases except where lineage entities are created before regular entities.
+ * In this case, shell entities get created in the absence of real entities. Problem is that there is 1 shell entity for any number of references.
+ * Circumventing this limitation is not easy.
+ *
+ * The pauseBeforeSend forces the situation where HiveMetaStore generated messages reach Kafka before lineage-producing hooks.
+ *
+ * @throws InterruptedException
+ */
+ private void pauseBeforeSend() throws InterruptedException {
+ if (!configuration.isHiveMetaStore()) {
+ int waitMs = configuration.getPauseBeforeSendSec() * 1000;
+ LOG.info("Waiting before dispatch: {}", waitMs);
+ Thread.sleep(waitMs);
+ }
+ }
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
index a9a3a78..74b8687 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -25,9 +25,11 @@ public class SpoolConfiguration {
private static final int PROP_RETRY_DESTINATION_MS_DEFAULT = 30000; // Default 30 seconds
private static final int PROP_FILE_ROLLOVER_SEC_DEFAULT = 60; // 60 secs
private static final int PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100;
+ private static final int PROP_PAUSE_BEFORE_SEND_MS_DEFAULT = 60;
private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT = "archive";
private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT = "/tmp/spool";
private static final int PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT = 100;
+ private static final String PROP_HIVE_METASTORE_NAME_DEFAULT = "HiveMetastoreHookImpl";
private static final String PROPERTY_PREFIX_SPOOL = "atlas.hook.spool.";
public static final String PROP_FILE_SPOOL_LOCAL_DIR = PROPERTY_PREFIX_SPOOL + "dir";
private static final String PROP_FILE_SPOOL_ARCHIVE_DIR = PROPERTY_PREFIX_SPOOL + "archive.dir";
@@ -35,6 +37,8 @@ public class SpoolConfiguration {
public static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC = PROPERTY_PREFIX_SPOOL + "file.rollover.sec";
public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = PROPERTY_PREFIX_SPOOL + "destination.retry.ms";
private static final String PROP_MESSAGE_BATCH_SIZE = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize";
+ private static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC = PROPERTY_PREFIX_SPOOL + "pause.before.send.sec";
+ private static final String PROP_HIVE_METASTORE_NAME = PROPERTY_PREFIX_SPOOL + "hivemetastore.name";
private final String messageHandlerName;
private final int maxArchivedFilesCount;
@@ -44,6 +48,8 @@ public class SpoolConfiguration {
private final int fileSpoolMaxFilesCount;
private final String spoolDirPath;
private final String archiveDir;
+ private final int pauseBeforeSendSec;
+ private final String hiveMetaStoreName;
private String sourceName;
public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
@@ -51,10 +57,12 @@ public class SpoolConfiguration {
this.maxArchivedFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
this.retryDestinationMS = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT);
+ this.pauseBeforeSendSec = cfg.getInt(PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC, PROP_PAUSE_BEFORE_SEND_MS_DEFAULT);
this.fileRollOverSec = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000;
this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
this.spoolDirPath = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT);
this.archiveDir = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
+ this.hiveMetaStoreName = cfg.getString(PROP_HIVE_METASTORE_NAME, PROP_HIVE_METASTORE_NAME_DEFAULT);
}
public void setSource(String val) {
@@ -120,4 +128,12 @@ public class SpoolConfiguration {
return new File(getSpoolDir(), fileDoneName);
}
+
+ public int getPauseBeforeSendSec() {
+ return pauseBeforeSendSec;
+ }
+
+ public boolean isHiveMetaStore() {
+ return this.sourceName.equals(this.hiveMetaStoreName);
+ }
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
index 2cacaaa..a918e9b 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
@@ -19,14 +19,14 @@ package org.apache.atlas.notification.spool;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.commons.io.IOUtils;
+import org.apache.atlas.type.AtlasType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutput;
-import java.io.PrintWriter;
import java.util.List;
public class Spooler extends AbstractNotification {
@@ -57,8 +57,14 @@ public class Spooler extends AbstractNotification {
@Override
public void sendInternal(NotificationType type, List<String> messages) {
- boolean ret = write(messages);
+ for (int i = 0; i < messages.size(); i++) {
+ AtlasNotificationMessage e = AtlasType.fromV1Json(messages.get(i), AtlasNotificationMessage.class);
+ e.setSpooled(true);
+
+ messages.set(i, AtlasType.toV1Json(e));
+ }
+ boolean ret = write(messages);
if (failedMessagesLogger != null && !ret) {
writeToFailedMessages(messages);
}
@@ -68,6 +74,11 @@ public class Spooler extends AbstractNotification {
public void close() {
}
+ @Override
+ public boolean isReady(NotificationType type) {
+ return true;
+ }
+
@VisibleForTesting
boolean write(List<String> messages) {
final boolean ret;
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index d7e4959..8078a6c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -127,5 +127,10 @@ public class AbstractNotificationTest {
@Override
public void close() {
}
+
+ @Override
+ public boolean isReady(NotificationType type) {
+ return true;
+ }
}
}
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
index 167efbe..265598e 100644
--- a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
@@ -81,6 +81,11 @@ public class AtlasFileSpoolTest extends BaseTest {
public void close() {
}
+
+ @Override
+ public boolean isReady(NotificationType type) {
+ return true;
+ }
}
@Test
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index cc727c6..ddfb008 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -339,6 +339,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
+ createCommonVertexIndex(management, ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, UniqueKind.NONE, Long.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java
new file mode 100644
index 0000000..4760757
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph;
+
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class EntityCorrelationStore {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationStore.class);
+
+ public EntityCorrelationStore() {
+ }
+
+ @GraphTransaction
+ public void add(String entityGuid, long messageTimestamp) {
+ AtlasVertex v = AtlasGraphUtilsV2.findByGuid(entityGuid);
+ if (v == null) {
+ LOG.warn("Fetching: {} did not yield result!", entityGuid);
+ return;
+ }
+
+ AtlasGraphUtilsV2.setEncodedProperty(v, Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, messageTimestamp);
+ LOG.info("Updating: {}: {}", entityGuid, messageTimestamp);
+ }
+
+ public String findCorrelatedGuid(String qualifiedName, long messageTimestamp) {
+ String guid = AtlasGraphUtilsV2.findFirstDeletedDuringSpooledByQualifiedName(qualifiedName, messageTimestamp);
+
+ LOG.info("findCorrelatedGuid: {} - {} -> {}", qualifiedName, messageTimestamp, guid);
+ return guid;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 0a94708..e73f084 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -367,6 +367,27 @@ public class AtlasGraphUtilsV2 {
return vertex;
}
+ public static String findFirstDeletedDuringSpooledByQualifiedName(String qualifiedName, long timestamp) {
+ return findFirstDeletedDuringSpooledByQualifiedName(getGraphInstance(), qualifiedName, timestamp);
+ }
+
+ public static String findFirstDeletedDuringSpooledByQualifiedName(AtlasGraph graph, String qualifiedName, long timestamp) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("findDeletedDuringSpooledByQualifiedName");
+
+ AtlasGraphQuery query = graph.query().has(STATE_PROPERTY_KEY, Status.DELETED.name())
+ .has(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.GREATER_THAN, timestamp)
+ .has(Constants.QUALIFIED_NAME, qualifiedName)
+ .orderBy(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, ASC);
+
+ Iterator iterator = query.vertices().iterator();
+
+ String ret = iterator.hasNext() ? GraphHelper.getGuid((AtlasVertex) iterator.next()) : null;
+
+ RequestContext.get().endMetricRecord(metric);
+
+ return ret;
+ }
+
public static AtlasVertex findByGuid(String guid) {
return findByGuid(getGraphInstance(), guid);
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java
new file mode 100644
index 0000000..a3be5f4
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.BasicTestSetup;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class EntityCorrelationStoreTest extends BasicTestSetup {
+ @Inject
+ AtlasGraph graph;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ super.initialize();
+
+ setupTestData();
+ }
+
+ @Test
+ public void verify() throws IOException, AtlasBaseException {
+ final String nonExistentQName = "db01@cm";
+ final String db01QName = "db01x@cm";
+ final EntityCorrelationStore entityCorrelationStore = new EntityCorrelationStore();
+
+ String db01 = TestResourceFileUtils.getJson("entities", "db01");
+
+ AtlasEntity.AtlasEntitiesWithExtInfo db = AtlasType.fromJson(db01, AtlasEntity.AtlasEntitiesWithExtInfo.class);
+ EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(db), false);
+
+ String dbGuid = response.getFirstEntityCreated().getGuid();
+ entityStore.deleteById(dbGuid);
+
+ entityCorrelationStore.add(dbGuid,2L);
+ graph.commit();
+
+ String guid = entityCorrelationStore.findCorrelatedGuid(nonExistentQName, 1);
+ assertNull(guid);
+
+ String fetchedGuid = entityCorrelationStore.findCorrelatedGuid(db01QName, 1L);
+ assertNotNull(fetchedGuid);
+ assertEquals(fetchedGuid, dbGuid);
+
+ guid = entityCorrelationStore.findCorrelatedGuid(db01QName, 2L);
+ assertNull(guid);
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
new file mode 100644
index 0000000..f1d6aff
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class EntityCorrelationManager {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationManager.class);
+
+ private final EntityCorrelationStore entityCorrelationStore;
+
+ public EntityCorrelationManager(EntityCorrelationStore entityCorrelationStore) {
+ this.entityCorrelationStore = entityCorrelationStore;
+ }
+
+ public void add(boolean spooled, long spooledTimestamp, List<AtlasEntityHeader> entityHeaders) {
+ if (this.entityCorrelationStore == null || spooled == false || CollectionUtils.isEmpty(entityHeaders)) {
+ return;
+ }
+
+ for (AtlasEntityHeader entityHeader : entityHeaders) {
+ String guid = entityHeader.getGuid();
+ if (StringUtils.isNotEmpty(guid)) {
+ entityCorrelationStore.add(guid, spooledTimestamp);
+ }
+ }
+ }
+
+ public String getGuidForDeletedEntityToBeCorrelated(String qualifiedName, long spooledMessageTimestamp) {
+ if (this.entityCorrelationStore == null || spooledMessageTimestamp <= 0) {
+ return null;
+ }
+
+ String guid = entityCorrelationStore.findCorrelatedGuid(qualifiedName, spooledMessageTimestamp);
+ LOG.info("{}: spooledTimestamp: {} -> {}", qualifiedName, spooledMessageTimestamp, guid);
+ return guid;
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 84cc8d8..5643af9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -40,6 +40,7 @@ import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
+import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
import org.apache.atlas.util.AtlasMetricsCounter;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.LruCache;
@@ -191,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private ExecutorService executors;
private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now());
private final Map<TopicPartition, Long> lastCommittedPartitionOffset;
+ private final EntityCorrelationManager entityCorrelationManager;
@VisibleForTesting
final int consumerRetryInterval;
@@ -201,7 +203,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
ServiceState serviceState, AtlasInstanceConverter instanceConverter,
- AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) throws AtlasException {
+ AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil,
+ EntityCorrelationStore entityCorrelationStore) throws AtlasException {
this.notificationInterface = notificationInterface;
this.atlasEntityStore = atlasEntityStore;
this.serviceState = serviceState;
@@ -308,7 +311,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
-
+ entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
@@ -688,6 +691,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
stats.updateStats(response);
+ entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
} catch (ClassCastException cle) {
LOG.error("Failed to delete entity {}", deleteRequest);
}
@@ -770,6 +774,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
stats.updateStats(response);
+ entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
}
} catch (ClassCastException cle) {
LOG.error("Failed to do delete entities {}", entities);
@@ -889,6 +894,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
RequestContext.get().resetEntityGuidUpdates();
+ entityCorrelationManager.add(context.isSpooledMessage(), context.getMsgCreated(), response.getDeletedEntities());
+
RequestContext.get().clearCache();
}
}
@@ -973,7 +980,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
PreprocessorContext context = null;
if (preprocessEnabled) {
- context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName);
+ context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
+ hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
+ rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 89568e2..7f0cafe 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -32,6 +32,8 @@ public abstract class EntityPreprocessor {
public static final String TYPE_HIVE_PROCESS = "hive_process";
public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
public static final String TYPE_HIVE_DB = "hive_db";
+ public static final String TYPE_HIVE_DB_DDL = "hive_db_ddl";
+ public static final String TYPE_HIVE_TABLE_DDL = "hive_table_ddl";
public static final String TYPE_HIVE_TABLE = "hive_table";
public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance";
public static final String TYPE_RDBMS_DB = "rdbms_db";
@@ -71,11 +73,13 @@ public abstract class EntityPreprocessor {
static {
EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
new HivePreprocessor.HiveDbPreprocessor(),
+ new HiveDbDDLPreprocessor(),
new HivePreprocessor.HiveTablePreprocessor(),
new HivePreprocessor.HiveColumnPreprocessor(),
new HivePreprocessor.HiveProcessPreprocessor(),
new HivePreprocessor.HiveColumnLineageProcessPreprocessor(),
- new HivePreprocessor.HiveStorageDescPreprocessor()
+ new HivePreprocessor.HiveStorageDescPreprocessor(),
+ new HiveTableDDLPreprocessor()
};
EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] {
@@ -158,6 +162,16 @@ public abstract class EntityPreprocessor {
return ret != null ? ret.toString() : null;
}
+ public void setObjectIdWithGuid(Object obj, String guid) {
+ if (obj instanceof AtlasObjectId) {
+ AtlasObjectId objectId = (AtlasObjectId) obj;
+ objectId.setGuid(guid);
+ } else if (obj instanceof Map) {
+ Map map = (Map) obj;
+ map.put("guid", guid);
+ }
+ }
+
protected boolean isEmpty(Object obj) {
return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty());
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
new file mode 100644
index 0000000..dcff093
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveDbDDLPreprocessor extends EntityPreprocessor {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveDbDDLPreprocessor.class);
+
+ protected HiveDbDDLPreprocessor() {
+ super(TYPE_HIVE_DB_DDL);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (!context.isSpooledMessage()) {
+ return;
+ }
+
+ Object dbObject = entity.getRelationshipAttribute(ATTRIBUTE_DB);
+ if (dbObject == null) {
+ return;
+ }
+
+ String qualifiedName = getQualifiedName(dbObject);
+ String guid = context.getGuidForDeletedEntity(qualifiedName);
+ if (StringUtils.isEmpty(guid)) {
+ return;
+ }
+
+ setObjectIdWithGuid(dbObject, guid);
+ LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 86e3384..bf6a623 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -228,6 +228,34 @@ public class HivePreprocessor {
}
}
}
+
+ preprocessCheckpoint(entity, context);
+ }
+
+ private void preprocessCheckpoint(AtlasEntity entity, PreprocessorContext context) {
+ if (!context.isSpooledMessage()) {
+ return;
+ }
+
+ String[] relationshipNames = new String[]{ATTRIBUTE_INPUTS, ATTRIBUTE_OUTPUTS};
+ for (String relationshipName : relationshipNames) {
+ Object val = entity.getRelationshipAttribute(relationshipName);
+ if (!isEmpty(val) && val instanceof List) {
+ updateListWithGuids(context, (List) val);
+ }
+ }
+ }
+
+ private void updateListWithGuids(PreprocessorContext context, List list) {
+ for (Object o : list) {
+ String qn = getQualifiedName(o);
+ String guid = context.getGuidForDeletedEntity(qn);
+ if (StringUtils.isEmpty(guid)) {
+ continue;
+ }
+
+ setObjectIdWithGuid(o, guid);
+ }
}
private int getCollectionSize(Object obj) {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
new file mode 100644
index 0000000..83d4d7c
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveTableDDLPreprocessor extends EntityPreprocessor {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveTableDDLPreprocessor.class);
+
+ protected HiveTableDDLPreprocessor() {
+ super(TYPE_HIVE_TABLE_DDL);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (!context.isSpooledMessage()) {
+ return;
+ }
+
+ Object tableObject = entity.getRelationshipAttribute(ATTRIBUTE_TABLE);
+ if (tableObject == null) {
+ return;
+ }
+
+ String qualifiedName = getQualifiedName(tableObject);
+ String guid = context.getGuidForDeletedEntity(qualifiedName);
+ if (StringUtils.isEmpty(guid)) {
+ return;
+ }
+
+ setObjectIdWithGuid(tableObject, guid);
+ LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 608b4a3..59f6440 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.EntityCorrelationManager;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
@@ -69,9 +70,10 @@ public class PreprocessorContext {
private final Set<String> createdEntities = new HashSet<>();
private final Set<String> deletedEntities = new HashSet<>();
private final Map<String, String> guidAssignments = new HashMap<>();
+ private final EntityCorrelationManager correlationManager;
private List<AtlasEntity> postUpdateEntities = null;
- public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName) {
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationMana [...]
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
@@ -101,6 +103,7 @@ public class PreprocessorContext {
}
this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
+ this.correlationManager = correlationManager;
}
public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
@@ -577,4 +580,16 @@ public class PreprocessorContext {
partialEntity.setAttribute(attrName, attrVal);
}
}
+
+ public long getMsgCreated() {
+ return kafkaMessage.getMsgCreated();
+ }
+
+ public boolean isSpooledMessage() {
+ return kafkaMessage.getSpooled();
+ }
+
+ public String getGuidForDeletedEntity(String qualifiedName) {
+ return this.correlationManager.getGuidForDeletedEntityToBeCorrelated(qualifiedName, kafkaMessage.getMsgCreated());
+ }
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 65e8b50..fdfc256 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -109,7 +109,7 @@ public class NotificationHookConsumerKafkaTest {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
@@ -128,7 +128,7 @@ public class NotificationHookConsumerKafkaTest {
public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
@@ -159,7 +159,7 @@ public class NotificationHookConsumerKafkaTest {
assertNotNull (consumer);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 15a1900..f440c42 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -98,7 +98,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerCanProceedIfServerIsReady() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -111,7 +111,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -128,7 +128,7 @@ public class NotificationHookConsumerTest {
@Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = mock(EntityCreateRequest.class);
@@ -145,7 +145,7 @@ public class NotificationHookConsumerTest {
@Test
public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
@@ -159,7 +159,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -179,7 +179,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
notificationHookConsumer.startInternal(configuration, executorService);
verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
@@ -197,7 +197,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
notificationHookConsumer.startInternal(configuration, executorService);
@@ -216,7 +216,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive();
@@ -236,7 +236,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
doAnswer(new Answer() {
@Override
@@ -267,7 +267,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
@@ -332,6 +332,6 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
+ return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null);
}
}