You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2021/11/01 06:15:54 UTC
[atlas] branch branch-2.0 updated: ATLAS-4428 : Enhance Atlas hook
messages to capture hook version & message source
This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 57932ca ATLAS-4428 : Enhance Atlas hook messages to capture hook version & message source
57932ca is described below
commit 57932caa78ca2bf0436b0aa968e2683fd52996eb
Author: chaitali borole <ch...@freestoneinfotech.com>
AuthorDate: Fri Oct 29 13:58:41 2021 +0530
ATLAS-4428 : Enhance Atlas hook messages to capture hook version & message source
Signed-off-by: Pinal Shah <pi...@freestoneinfotech.com>
---
.../org/apache/atlas/falcon/hook/FalconHook.java | 6 ++
.../apache/atlas/hbase/bridge/HBaseAtlasHook.java | 8 +-
.../java/org/apache/atlas/hive/hook/HiveHook.java | 4 +
.../atlas/hive/hook/HiveMetastoreHookImpl.java | 6 ++
.../atlas/impala/hook/ImpalaLineageHook.java | 6 ++
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 8 ++
.../apache/atlas/storm/hook/StormAtlasHook.java | 7 ++
.../org/apache/atlas/repository/Constants.java | 9 +++
.../notification/AtlasNotificationBaseMessage.java | 18 +++++
.../notification/AtlasNotificationMessage.java | 8 +-
.../atlas/model/notification/MessageSource.java | 90 ++++++++++++++++++++++
intg/src/main/resources/atlas-buildinfo.properties | 28 +++++++
.../main/java/org/apache/atlas/hook/AtlasHook.java | 20 +++--
.../atlas/notification/AbstractNotification.java | 13 +++-
.../atlas/notification/NotificationInterface.java | 3 +
.../atlas/notification/spool/AtlasFileSpool.java | 15 +++-
.../java/org/apache/atlas/hook/AtlasHookTest.java | 33 ++++----
.../notification/AbstractNotificationTest.java | 17 ++--
.../notification/AtlasNotificationMessageTest.java | 33 +++++++-
.../entity/EntityNotificationDeserializerTest.java | 4 +-
.../hook/HookNotificationDeserializerTest.java | 8 +-
21 files changed, 302 insertions(+), 42 deletions(-)
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 8c09d33..b8a73cb 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.atlas.repository.Constants.FALCON_SOURCE;
/**
* Falcon hook sends lineage information to the Atlas Service.
@@ -44,6 +45,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
private static ConfigurationStore STORE;
+ @Override
+ public String getMessageSource() {
+ return FALCON_SOURCE;
+ }
+
private enum Operation {
ADD,
UPDATE
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 0ab06f2..8e6c57d 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -49,11 +49,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.atlas.repository.Constants.HBASE_SOURCE;
+
// This will register Hbase entities into Atlas
public class HBaseAtlasHook extends AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHook.class);
-
+
public static final String ATTR_DESCRIPTION = "description";
public static final String ATTR_ATLAS_ENDPOINT = "atlas.rest.address";
public static final String ATTR_PARAMETERS = "parameters";
@@ -497,6 +499,10 @@ public class HBaseAtlasHook extends AtlasHook {
return columnFamily;
}
+ public String getMessageSource() {
+ return HBASE_SOURCE;
+ }
+
private String getTableName(HBaseOperationContext hbaseOperationContext) {
final String ret;
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 3cc7b3b..6ea4848 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -49,6 +49,7 @@ import java.util.regex.Pattern;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
+import static org.apache.atlas.repository.Constants.HS2_SOURCE;
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
@@ -176,6 +177,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
super(name);
}
+ public String getMessageSource() {
+ return HS2_SOURCE;
+ }
@Override
public void run(HookContext hookContext) throws Exception {
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
index 6a492c2..33266ce 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.atlas.hive.hook.events.AlterTableRenameCol.findRenamedColumn;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable;
+import static org.apache.atlas.repository.Constants.HMS_SOURCE;
import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
public class HiveMetastoreHookImpl extends MetaStoreEventListener {
@@ -106,6 +107,11 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener {
public HiveMetastoreHook() {
}
+ @Override
+ public String getMessageSource() {
+ return HMS_SOURCE;
+ }
+
public void handleEvent(HiveOperationContext operContext) {
ListenerEvent listenerEvent = operContext.getEvent();
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index 10ae08f..907f244 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -36,6 +36,8 @@ import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import java.util.HashSet;
+import static org.apache.atlas.repository.Constants.IMPALA_SOURCE;
+
public class ImpalaLineageHook extends AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
@@ -65,6 +67,10 @@ public class ImpalaLineageHook extends AtlasHook {
}
+ public String getMessageSource() {
+ return IMPALA_SOURCE;
+ }
+
public void process(String impalaQueryString) throws Exception {
if (StringUtils.isEmpty(impalaQueryString)) {
LOG.warn("==> ImpalaLineageHook.process skips because the impalaQueryString is empty <==");
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 4785960..0a8cb96 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -47,6 +47,9 @@ import java.util.HashMap;
import java.util.Properties;
import java.util.List;
import java.util.Date;
+
+import static org.apache.atlas.repository.Constants.SQOOP_SOURCE;
+
/**
* AtlasHook sends lineage information to the AtlasSever.
*/
@@ -243,6 +246,11 @@ public class SqoopHook extends SqoopJobDataPublisher {
}
private static class AtlasHookImpl extends AtlasHook {
+
+ public String getMessageSource() {
+ return SQOOP_SOURCE;
+ }
+
public void sendNotification(HookNotification notification) {
super.notifyEntities(Collections.singletonList(notification), null);
}
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 779c5cb..69d58d5 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -53,6 +53,8 @@ import java.util.Map;
import java.util.Set;
import java.util.Date;
+import static org.apache.atlas.repository.Constants.STORM_SOURCE;
+
/**
* StormAtlasHook sends storm topology metadata information to Atlas
* via a Kafka Broker for durability.
@@ -406,4 +408,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
return clusterName;
}
+
+ @Override
+ public String getMessageSource() {
+ return STORM_SOURCE;
+ }
}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 2669c8a..7cd67a0 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -236,6 +236,15 @@ public final class Constants {
public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
+ public static final String SQOOP_SOURCE = "sqoop";
+ public static final String FALCON_SOURCE = "falcon";
+ public static final String HBASE_SOURCE = "hbase";
+ public static final String HS2_SOURCE = "hive_server2";
+ public static final String HMS_SOURCE = "hive_metastore";
+ public static final String IMPALA_SOURCE = "impala";
+ public static final String STORM_SOURCE = "storm";
+ public static final String FILE_SPOOL_SOURCE = "file_spool";
+
/*
* All supported file-format extensions for Bulk Imports through file upload
*/
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
index ff45d57..97fcbac 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
@@ -55,6 +55,7 @@ public class AtlasNotificationBaseMessage {
public enum CompressionKind { NONE, GZIP };
+ private MessageSource source = null;
private MessageVersion version = null;
private String msgId = null;
private CompressionKind msgCompressionKind = CompressionKind.NONE;
@@ -70,9 +71,18 @@ public class AtlasNotificationBaseMessage {
}
public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
+ this (version, msgId, msgCompressionKind, null);
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version, MessageSource source) {
+ this (version, null, CompressionKind.NONE, source);
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, MessageSource source) {
this.version = version;
this.msgId = msgId;
this.msgCompressionKind = msgCompressionKind;
+ this.source = source;
}
public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
@@ -91,6 +101,14 @@ public class AtlasNotificationBaseMessage {
return version;
}
+ public void setSource(MessageSource source) {
+ this.source = source;
+ }
+
+ public MessageSource getSource() {
+ return source;
+ }
+
public String getMsgId() {
return msgId;
}
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
index 5869910..42032b4 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
@@ -60,7 +60,11 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
}
public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled) {
- super(version);
+ this(version, message, msgSourceIP, createdBy, spooled, null);
+ }
+
+ public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled, MessageSource source) {
+ super(version, source);
this.msgSourceIP = msgSourceIP;
this.msgCreatedBy = createdBy;
@@ -70,7 +74,7 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
}
public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
- this(version, message, msgSourceIP, createdBy, false);
+ this(version, message, msgSourceIP, createdBy, false, null);
}
public String getMsgSourceIP() {
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java b/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java
new file mode 100644
index 0000000..b5c3cdc
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.notification;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Base class of hook information.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class MessageSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MessageSource.class);
+ private static final String BUILDINFO_PROPERTIES = "/atlas-buildinfo.properties";
+ private static final String BUILD_VERSION_PROPERTY_KEY = "build.version";
+ private static final String BUILD_VERSION_DEFAULT = "UNKNOWN";
+
+ private static String storedVersion;
+ private String name;
+ private String version;
+
+
+ static {
+ storedVersion = fetchBuildVersion();
+ }
+
+ public MessageSource() {
+ }
+
+ public MessageSource(String name) {
+ this.version = storedVersion;
+ this.name = name;
+ }
+
+ public String getSource () { return name; }
+
+ public void setSource(String name) { this.name = name; }
+
+ public String getVersion () {
+ return version;
+ }
+
+ private static String fetchBuildVersion() {
+ Properties properties = new java.util.Properties();
+ InputStream inputStream = MessageSource.class.getResourceAsStream(BUILDINFO_PROPERTIES);
+ InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
+ try {
+ properties.load(inputStreamReader);
+ } catch (IOException e) {
+ LOG.error("Failed to load atlas-buildinfo properties. Will use default version.", e);
+ }
+
+ return properties.getProperty(BUILD_VERSION_PROPERTY_KEY, BUILD_VERSION_DEFAULT);
+ }
+}
diff --git a/intg/src/main/resources/atlas-buildinfo.properties b/intg/src/main/resources/atlas-buildinfo.properties
new file mode 100644
index 0000000..2404f8f
--- /dev/null
+++ b/intg/src/main/resources/atlas-buildinfo.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######################
+project.name=${pom.parent.name}
+project.description=${pom.parent.description}
+build.user=${user.name}
+build.epoch=${timestamp}
+project.version=${pom.version}
+build.version=${pom.version}
+vc.revision=${buildNumber}
+vc.source.url=${scm.connection}
+######################
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 9162ac1..24ea6ea 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -28,6 +28,7 @@ import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.utils.AtlasConfigurationUtil;
import org.apache.commons.configuration.Configuration;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
@@ -65,6 +66,7 @@ public abstract class AtlasHook {
protected static Configuration atlasProperties;
protected static NotificationInterface notificationInterface;
+ protected MessageSource source;
private static final String metadataNamespace;
private static final int SHUTDOWN_HOOK_WAIT_TIME_MS = 3000;
@@ -143,14 +145,18 @@ public abstract class AtlasHook {
}
public AtlasHook() {
+ source = new MessageSource(getMessageSource());
notificationInterface.init(this.getClass().getSimpleName(), failedMessagesLogger);
}
public AtlasHook(String name) {
+ source = new MessageSource(getMessageSource());
LOG.info("AtlasHook: Spool name: Passed from caller.: {}", name);
notificationInterface.init(name, failedMessagesLogger);
}
+ public abstract String getMessageSource();
+
/**
* Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
@@ -160,14 +166,14 @@ public abstract class AtlasHook {
* @param messages hook notification messages
* @param maxRetries maximum number of retries while sending message to messaging system
*/
- public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {
+ public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) {
if (executor == null) { // send synchronously
- notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
+ notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
- notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
+ notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
}
});
}
@@ -176,7 +182,7 @@ public abstract class AtlasHook {
@VisibleForTesting
static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, UserGroupInformation ugi,
NotificationInterface notificationInterface,
- boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
+ boolean shouldLogFailedMessages, FailedMessagesLogger logger, MessageSource source) {
if (messages == null || messages.isEmpty()) {
return;
}
@@ -199,12 +205,12 @@ public abstract class AtlasHook {
try {
if (ugi == null) {
- notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+ notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages, source);
} else {
PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+ notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages, source);
return messages;
}
};
@@ -244,7 +250,7 @@ public abstract class AtlasHook {
* @param messages hook notification messages
*/
protected void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi) {
- notifyEntities(messages, ugi, notificationMaxRetries);
+ notifyEntities(messages, ugi, notificationMaxRetries, source);
}
/**
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index c45a1da..cca4cb8 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.model.notification.MessageVersion;
import org.apache.commons.configuration.Configuration;
@@ -84,10 +85,15 @@ public abstract class AbstractNotification implements NotificationInterface {
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+ send(type, messages, new MessageSource());
+ }
+
+ @Override
+ public <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException {
List<String> strMessages = new ArrayList<>(messages.size());
for (int index = 0; index < messages.size(); index++) {
- createNotificationMessages(messages.get(index), strMessages);
+ createNotificationMessages(messages.get(index), strMessages, source);
}
sendInternal(type, strMessages);
@@ -146,10 +152,11 @@ public abstract class AbstractNotification implements NotificationInterface {
*
* @param message the message in object form
*
+ * @param source
* @return the message as a JSON string
*/
- public static void createNotificationMessages(Object message, List<String> msgJsonList) {
- AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
+ public static void createNotificationMessages(Object message, List<String> msgJsonList, MessageSource source) {
+ AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser(), false, source);
String msgJson = AtlasType.toV1Json(notificationMsg);
boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 3d8d9cc..a9cd4a6 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,6 +17,7 @@
*/
package org.apache.atlas.notification;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.notification.entity.EntityMessageDeserializer;
import org.apache.atlas.notification.hook.HookMessageDeserializer;
@@ -109,6 +110,8 @@ public interface NotificationInterface {
/**
* Shutdown any notification producers and consumers associated with this interface instance.
*/
+ <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException;
+
void close();
/**
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index 0c92c30..57c0c7d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -19,6 +19,7 @@ package org.apache.atlas.notification.spool;
import org.apache.atlas.AtlasException;
import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.atlas.repository.Constants.FILE_SPOOL_SOURCE;
public class AtlasFileSpool implements NotificationInterface {
private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class);
@@ -103,13 +105,18 @@ public class AtlasFileSpool implements NotificationInterface {
}
@Override
+ public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+ send(type, messages, new MessageSource(FILE_SPOOL_SOURCE));
+ }
+
+ @Override
public boolean isReady(NotificationType type) {
return true;
}
@Override
- public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
- List<String> serializedMessages = getSerializedMessages(messages);
+ public <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException {
+ List<String> serializedMessages = getSerializedMessages(messages, source);
if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
if (LOG.isDebugEnabled()) {
LOG.debug("AtlasFileSpool.send(): sending to spooler");
@@ -139,10 +146,10 @@ public class AtlasFileSpool implements NotificationInterface {
}
}
- private <T> List<String> getSerializedMessages(List<T> messages) {
+ private <T> List<String> getSerializedMessages(List<T> messages, MessageSource source) {
List<String> serializedMessages = new ArrayList<>(messages.size());
for (int index = 0; index < messages.size(); index++) {
- notificationHandler.createNotificationMessages(messages.get(index), serializedMessages);
+ notificationHandler.createNotificationMessages(messages.get(index), serializedMessages, source);
}
return serializedMessages;
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index 1ae7c27..b094247 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -18,6 +18,7 @@
package org.apache.atlas.hook;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
@@ -52,32 +53,35 @@ public class AtlasHookTest {
@Test (timeOut = 10000)
public void testNotifyEntitiesDoesNotHangOnException() throws Exception {
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
List<HookNotification> hookNotifications = new ArrayList<>();
doThrow(new NotificationException(new Exception())).when(notificationInterface)
- .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+ .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
AtlasHook.notifyEntitiesInternal(hookNotifications, 0, null, notificationInterface, false,
- failedMessagesLogger);
+ failedMessagesLogger, source);
// if we've reached here, the method finished OK.
}
@Test
public void testNotifyEntitiesRetriesOnException() throws NotificationException {
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
List<HookNotification> hookNotifications =
new ArrayList<HookNotification>() {{
add(new EntityCreateRequest("user"));
}
};
doThrow(new NotificationException(new Exception())).when(notificationInterface)
- .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+ .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false,
- failedMessagesLogger);
+ failedMessagesLogger, source);
verify(notificationInterface, times(2)).
- send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+ send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
}
@Test
public void testFailedMessageIsLoggedIfRequired() throws NotificationException {
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
List<HookNotification> hookNotifications =
new ArrayList<HookNotification>() {{
add(new EntityCreateRequest("user"));
@@ -85,27 +89,29 @@ public class AtlasHookTest {
};
doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
.when(notificationInterface)
- .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+ .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
- failedMessagesLogger);
+ failedMessagesLogger, source);
verify(failedMessagesLogger, times(1)).log("test message");
}
@Test
public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException {
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
List<HookNotification> hookNotifications = new ArrayList<>();
doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
.when(notificationInterface)
- .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+ .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false,
- failedMessagesLogger);
+ failedMessagesLogger, source);
verifyZeroInteractions(failedMessagesLogger);
}
@Test
public void testAllFailedMessagesAreLogged() throws NotificationException {
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
List<HookNotification> hookNotifications =
new ArrayList<HookNotification>() {{
add(new EntityCreateRequest("user"));
@@ -113,9 +119,9 @@ public class AtlasHookTest {
};
doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2")))
.when(notificationInterface)
- .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+ .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
- failedMessagesLogger);
+ failedMessagesLogger, source);
verify(failedMessagesLogger, times(1)).log("test message1");
verify(failedMessagesLogger, times(1)).log("test message2");
@@ -123,11 +129,12 @@ public class AtlasHookTest {
@Test
public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception {
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
List<HookNotification> hookNotifications = new ArrayList<>();
doThrow(new RuntimeException("test message")).when(notificationInterface)
- .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+ .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
- failedMessagesLogger);
+ failedMessagesLogger, source);
verifyZeroInteractions(failedMessagesLogger);
}
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 8078a6c..4e1c094 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -19,6 +19,7 @@
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
@@ -40,6 +41,7 @@ public class AbstractNotificationTest {
@org.testng.annotations.Test
public void testSend() throws Exception {
+ MessageSource source = new MessageSource();
Configuration configuration = mock(Configuration.class);
TestNotification notification = new TestNotification(configuration);
Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1");
@@ -47,9 +49,9 @@ public class AbstractNotificationTest {
Test message3 = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1");
List<String> messageJson = new ArrayList<>();
- AbstractNotification.createNotificationMessages(message1, messageJson);
- AbstractNotification.createNotificationMessages(message2, messageJson);
- AbstractNotification.createNotificationMessages(message3, messageJson);
+ AbstractNotification.createNotificationMessages(message1, messageJson, source);
+ AbstractNotification.createNotificationMessages(message2, messageJson, source);
+ AbstractNotification.createNotificationMessages(message3, messageJson, source);
notification.send(NotificationType.HOOK, message1, message2, message3);
@@ -63,6 +65,7 @@ public class AbstractNotificationTest {
@org.testng.annotations.Test
public void testSend2() throws Exception {
+ MessageSource source = new MessageSource();
Configuration configuration = mock(Configuration.class);
TestNotification notification = new TestNotification(configuration);
Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1");
@@ -71,11 +74,11 @@ public class AbstractNotificationTest {
List<Test> messages = Arrays.asList(message1, message2, message3);
List<String> messageJson = new ArrayList<>();
- AbstractNotification.createNotificationMessages(message1, messageJson);
- AbstractNotification.createNotificationMessages(message2, messageJson);
- AbstractNotification.createNotificationMessages(message3, messageJson);
+ AbstractNotification.createNotificationMessages(message1, messageJson, source);
+ AbstractNotification.createNotificationMessages(message2, messageJson, source);
+ AbstractNotification.createNotificationMessages(message3, messageJson, source);
- notification.send(NotificationInterface.NotificationType.HOOK, messages);
+ notification.send(NotificationInterface.NotificationType.HOOK, messages, source);
assertEquals(notification.type, NotificationType.HOOK);
assertEquals(notification.messages.size(), messageJson.size());
diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
index 91a195d..463797c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
@@ -19,16 +19,26 @@
package org.apache.atlas.notification;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.model.notification.MessageVersion;
+import org.apache.atlas.notification.entity.EntityNotificationTest;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.testng.annotations.Test;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Collections;
+
import static org.testng.Assert.*;
/**
* AtlasNotificationMessage tests.
*/
public class AtlasNotificationMessageTest {
-
+
@Test
public void testGetVersion() throws Exception {
MessageVersion version = new MessageVersion("1.0.0");
@@ -56,4 +66,25 @@ public class AtlasNotificationMessageTest {
assertTrue(atlasNotificationMessage.compareVersion(version2) < 0);
assertTrue(atlasNotificationMessage.compareVersion(version3) > 0);
}
+
+ @Test
+ public void testMessageSource() throws Exception {
+ Referenceable entity = generateEntityWithTrait();
+ HookNotificationV1.EntityUpdateRequest message = new HookNotificationV1.EntityUpdateRequest("user1", entity);
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
+ List<String> jsonList = new LinkedList<>();
+
+ AbstractNotification.createNotificationMessages(message, jsonList, source);
+ for(Object json : jsonList) {
+ AtlasNotificationMessage atlasNotificationMessage = AtlasType.fromV1Json((String) json, AtlasNotificationMessage.class);
+ assertEquals("\"" + source.getSource() + "\"" ,AtlasType.toV1Json(atlasNotificationMessage.getSource().getSource()));
+ }
+ }
+
+ private Referenceable generateEntityWithTrait() {
+ Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+ return ret;
+ }
+
}
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
index 13eafb6..2953b63 100644
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
@@ -19,6 +19,7 @@
package org.apache.atlas.notification.entity;
import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.notification.AbstractNotification;
@@ -37,6 +38,7 @@ import static org.testng.Assert.assertTrue;
*/
public class EntityNotificationDeserializerTest {
private EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
@Test
public void testDeserialize() throws Exception {
@@ -46,7 +48,7 @@ public class EntityNotificationDeserializerTest {
EntityNotificationV1 notification = new EntityNotificationV1(entity, EntityNotificationV1.OperationType.TRAIT_ADD, traits);
List<String> jsonMsgList = new ArrayList<>();
- AbstractNotification.createNotificationMessages(notification, jsonMsgList);
+ AbstractNotification.createNotificationMessages(notification, jsonMsgList, source);
EntityNotification deserializedNotification = null;
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
index d048170..bfc9b53 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
@@ -18,6 +18,7 @@
package org.apache.atlas.notification.hook;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.entity.EntityNotificationTest;
import org.apache.atlas.v1.model.instance.Referenceable;
@@ -41,6 +42,7 @@ import static org.testng.Assert.assertTrue;
*/
public class HookNotificationDeserializerTest {
private HookMessageDeserializer deserializer = new HookMessageDeserializer();
+ MessageSource source = new MessageSource(this.getClass().getSimpleName());
@Test
public void testDeserialize() throws Exception {
@@ -48,7 +50,7 @@ public class HookNotificationDeserializerTest {
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
List<String> jsonMsgList = new ArrayList<>();
- AbstractNotification.createNotificationMessages(message, jsonMsgList);
+ AbstractNotification.createNotificationMessages(message, jsonMsgList, source);
HookNotification deserializedMessage = deserialize(jsonMsgList);
@@ -72,7 +74,7 @@ public class HookNotificationDeserializerTest {
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
List<String> jsonMsgList = new ArrayList<>();
- AbstractNotification.createNotificationMessages(message, jsonMsgList);
+ AbstractNotification.createNotificationMessages(message, jsonMsgList, source);
assertTrue(jsonMsgList.size() == 1);
@@ -92,7 +94,7 @@ public class HookNotificationDeserializerTest {
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
List<String> jsonMsgList = new ArrayList<>();
- AbstractNotification.createNotificationMessages(message, jsonMsgList);
+ AbstractNotification.createNotificationMessages(message, jsonMsgList, source);
assertTrue(jsonMsgList.size() > 1);