You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2021/11/01 06:15:54 UTC

[atlas] branch branch-2.0 updated: ATLAS-4428 : Enhance Atlas hook messages to capture hook version & message source

This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 57932ca  ATLAS-4428 : Enhance Atlas hook messages to capture hook version & message source
57932ca is described below

commit 57932caa78ca2bf0436b0aa968e2683fd52996eb
Author: chaitali borole <ch...@freestoneinfotech.com>
AuthorDate: Fri Oct 29 13:58:41 2021 +0530

    ATLAS-4428 : Enhance Atlas hook messages to capture hook version & message source
    
    Signed-off-by: Pinal Shah <pi...@freestoneinfotech.com>
---
 .../org/apache/atlas/falcon/hook/FalconHook.java   |  6 ++
 .../apache/atlas/hbase/bridge/HBaseAtlasHook.java  |  8 +-
 .../java/org/apache/atlas/hive/hook/HiveHook.java  |  4 +
 .../atlas/hive/hook/HiveMetastoreHookImpl.java     |  6 ++
 .../atlas/impala/hook/ImpalaLineageHook.java       |  6 ++
 .../org/apache/atlas/sqoop/hook/SqoopHook.java     |  8 ++
 .../apache/atlas/storm/hook/StormAtlasHook.java    |  7 ++
 .../org/apache/atlas/repository/Constants.java     |  9 +++
 .../notification/AtlasNotificationBaseMessage.java | 18 +++++
 .../notification/AtlasNotificationMessage.java     |  8 +-
 .../atlas/model/notification/MessageSource.java    | 90 ++++++++++++++++++++++
 intg/src/main/resources/atlas-buildinfo.properties | 28 +++++++
 .../main/java/org/apache/atlas/hook/AtlasHook.java | 20 +++--
 .../atlas/notification/AbstractNotification.java   | 13 +++-
 .../atlas/notification/NotificationInterface.java  |  3 +
 .../atlas/notification/spool/AtlasFileSpool.java   | 15 +++-
 .../java/org/apache/atlas/hook/AtlasHookTest.java  | 33 ++++----
 .../notification/AbstractNotificationTest.java     | 17 ++--
 .../notification/AtlasNotificationMessageTest.java | 33 +++++++-
 .../entity/EntityNotificationDeserializerTest.java |  4 +-
 .../hook/HookNotificationDeserializerTest.java     |  8 +-
 21 files changed, 302 insertions(+), 42 deletions(-)

diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 8c09d33..b8a73cb 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import static org.apache.atlas.repository.Constants.FALCON_SOURCE;
 
 /**
  * Falcon hook sends lineage information to the Atlas Service.
@@ -44,6 +45,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
 
     private static ConfigurationStore STORE;
 
+    @Override
+    public String getMessageSource() {
+        return FALCON_SOURCE;
+    }
+
     private enum Operation {
         ADD,
         UPDATE
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 0ab06f2..8e6c57d 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -49,11 +49,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.atlas.repository.Constants.HBASE_SOURCE;
+
 // This will register Hbase entities into Atlas
 public class HBaseAtlasHook extends AtlasHook {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHook.class);
 
-
+    
     public static final String ATTR_DESCRIPTION           = "description";
     public static final String ATTR_ATLAS_ENDPOINT        = "atlas.rest.address";
     public static final String ATTR_PARAMETERS            = "parameters";
@@ -497,6 +499,10 @@ public class HBaseAtlasHook extends AtlasHook {
         return columnFamily;
     }
 
+    public String getMessageSource() {
+        return HBASE_SOURCE;
+    }
+
     private String getTableName(HBaseOperationContext hbaseOperationContext) {
         final String ret;
 
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 3cc7b3b..6ea4848 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -49,6 +49,7 @@ import java.util.regex.Pattern;
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
+import static org.apache.atlas.repository.Constants.HS2_SOURCE;
 
 public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
@@ -176,6 +177,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         super(name);
     }
 
+    public String getMessageSource() {
+        return HS2_SOURCE;
+    }
 
     @Override
     public void run(HookContext hookContext) throws Exception {
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
index 6a492c2..33266ce 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.atlas.hive.hook.events.AlterTableRenameCol.findRenamedColumn;
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable;
+import static org.apache.atlas.repository.Constants.HMS_SOURCE;
 import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
 
 public class HiveMetastoreHookImpl extends MetaStoreEventListener {
@@ -106,6 +107,11 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener {
         public HiveMetastoreHook() {
         }
 
+        @Override
+        public String getMessageSource() {
+            return HMS_SOURCE;
+        }
+
         public void handleEvent(HiveOperationContext operContext) {
             ListenerEvent listenerEvent = operContext.getEvent();
 
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index 10ae08f..907f244 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -36,6 +36,8 @@ import javax.security.auth.Subject;
 import javax.security.auth.kerberos.KerberosPrincipal;
 import java.util.HashSet;
 
+import static org.apache.atlas.repository.Constants.IMPALA_SOURCE;
+
 public class ImpalaLineageHook extends AtlasHook {
     private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
     public static final String ATLAS_ENDPOINT                      = "atlas.rest.address";
@@ -65,6 +67,10 @@ public class ImpalaLineageHook extends AtlasHook {
 
     }
 
+    public String getMessageSource() {
+        return IMPALA_SOURCE;
+    }
+
     public void process(String impalaQueryString) throws Exception {
         if (StringUtils.isEmpty(impalaQueryString)) {
             LOG.warn("==> ImpalaLineageHook.process skips because the impalaQueryString is empty <==");
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 4785960..0a8cb96 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -47,6 +47,9 @@ import java.util.HashMap;
 import java.util.Properties;
 import java.util.List;
 import java.util.Date;
+
+import static org.apache.atlas.repository.Constants.SQOOP_SOURCE;
+
 /**
  * AtlasHook sends lineage information to the AtlasSever.
  */
@@ -243,6 +246,11 @@ public class SqoopHook extends SqoopJobDataPublisher {
     }
 
     private static class AtlasHookImpl extends AtlasHook {
+
+        public String getMessageSource() {
+            return SQOOP_SOURCE;
+        }
+
         public void sendNotification(HookNotification notification) {
             super.notifyEntities(Collections.singletonList(notification), null);
         }
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 779c5cb..69d58d5 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -53,6 +53,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Date;
 
+import static org.apache.atlas.repository.Constants.STORM_SOURCE;
+
 /**
  * StormAtlasHook sends storm topology metadata information to Atlas
  * via a Kafka Broker for durability.
@@ -406,4 +408,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
 
         return clusterName;
     }
+
+    @Override
+    public String getMessageSource() {
+        return STORM_SOURCE;
+    }
 }
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 2669c8a..7cd67a0 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -236,6 +236,15 @@ public final class Constants {
     public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
     public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME  = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
 
+    public static final String SQOOP_SOURCE       = "sqoop";
+    public static final String FALCON_SOURCE      = "falcon";
+    public static final String HBASE_SOURCE       = "hbase";
+    public static final String HS2_SOURCE         = "hive_server2";
+    public static final String HMS_SOURCE         = "hive_metastore";
+    public static final String IMPALA_SOURCE      = "impala";
+    public static final String STORM_SOURCE       = "storm";
+    public static final String FILE_SPOOL_SOURCE  = "file_spool";
+
     /*
      * All supported file-format extensions for Bulk Imports through file upload
      */
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
index ff45d57..97fcbac 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
@@ -55,6 +55,7 @@ public class AtlasNotificationBaseMessage {
 
     public enum CompressionKind { NONE, GZIP };
 
+    private MessageSource   source             = null;
     private MessageVersion  version            = null;
     private String          msgId              = null;
     private CompressionKind msgCompressionKind = CompressionKind.NONE;
@@ -70,9 +71,18 @@ public class AtlasNotificationBaseMessage {
     }
 
     public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
+        this (version, msgId, msgCompressionKind, null);
+    }
+
+    public AtlasNotificationBaseMessage(MessageVersion version, MessageSource source) {
+        this (version, null, CompressionKind.NONE, source);
+    }
+
+    public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, MessageSource source) {
         this.version            = version;
         this.msgId              = msgId;
         this.msgCompressionKind = msgCompressionKind;
+        this.source             = source;
     }
 
     public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
@@ -91,6 +101,14 @@ public class AtlasNotificationBaseMessage {
         return version;
     }
 
+    public void setSource(MessageSource source) {
+        this.source = source;
+    }
+
+    public MessageSource getSource() {
+        return source;
+    }
+
     public String getMsgId() {
         return msgId;
     }
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
index 5869910..42032b4 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
@@ -60,7 +60,11 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
     }
 
     public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled) {
-        super(version);
+        this(version, message, msgSourceIP, createdBy, spooled, null);
+    }
+
+    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled, MessageSource source) {
+        super(version, source);
 
         this.msgSourceIP     = msgSourceIP;
         this.msgCreatedBy    = createdBy;
@@ -70,7 +74,7 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
     }
 
     public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
-        this(version, message, msgSourceIP, createdBy, false);
+        this(version, message, msgSourceIP, createdBy, false, null);
     }
 
     public String getMsgSourceIP() {
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java b/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java
new file mode 100644
index 0000000..b5c3cdc
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/MessageSource.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.notification;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Base class of hook information.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class MessageSource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageSource.class);
+    private static final String BUILDINFO_PROPERTIES        = "/atlas-buildinfo.properties";
+    private static final String BUILD_VERSION_PROPERTY_KEY  = "build.version";
+    private static final String BUILD_VERSION_DEFAULT       = "UNKNOWN";
+
+    private static String storedVersion;
+    private String name;
+    private String version;
+
+
+    static {
+        storedVersion = fetchBuildVersion();
+    }
+
+    public MessageSource() {
+    }
+
+    public MessageSource(String name) {
+        this.version = storedVersion;
+        this.name        = name;
+    }
+
+    public String getSource () { return name; }
+
+    public void setSource(String name) { this.name = name; }
+
+    public String getVersion () {
+        return version;
+    }
+
+    private static String fetchBuildVersion() {
+        Properties properties               = new java.util.Properties();
+        InputStream       inputStream       = MessageSource.class.getResourceAsStream(BUILDINFO_PROPERTIES);
+        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
+        try {
+            properties.load(inputStreamReader);
+        } catch (IOException e) {
+            LOG.error("Failed to load atlas-buildinfo properties. Will use default version.", e);
+        }
+
+        return properties.getProperty(BUILD_VERSION_PROPERTY_KEY, BUILD_VERSION_DEFAULT);
+    }
+}
diff --git a/intg/src/main/resources/atlas-buildinfo.properties b/intg/src/main/resources/atlas-buildinfo.properties
new file mode 100644
index 0000000..2404f8f
--- /dev/null
+++ b/intg/src/main/resources/atlas-buildinfo.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######################
+project.name=${pom.parent.name}
+project.description=${pom.parent.description}
+build.user=${user.name}
+build.epoch=${timestamp}
+project.version=${pom.version}
+build.version=${pom.version}
+vc.revision=${buildNumber}
+vc.source.url=${scm.connection}
+######################
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 9162ac1..24ea6ea 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -28,6 +28,7 @@ import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.utils.AtlasConfigurationUtil;
 import org.apache.commons.configuration.Configuration;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -65,6 +66,7 @@ public abstract class AtlasHook {
 
     protected static Configuration         atlasProperties;
     protected static NotificationInterface notificationInterface;
+    protected MessageSource source;
 
     private static final String               metadataNamespace;
     private static final int                  SHUTDOWN_HOOK_WAIT_TIME_MS = 3000;
@@ -143,14 +145,18 @@ public abstract class AtlasHook {
     }
 
     public AtlasHook() {
+        source = new MessageSource(getMessageSource());
         notificationInterface.init(this.getClass().getSimpleName(), failedMessagesLogger);
     }
 
     public AtlasHook(String name) {
+        source = new MessageSource(getMessageSource());
         LOG.info("AtlasHook: Spool name: Passed from caller.: {}", name);
         notificationInterface.init(name, failedMessagesLogger);
     }
 
+    public abstract String getMessageSource();
+
     /**
      * Notify atlas of the entity through message. The entity can be a
      * complex entity with reference to other entities.
@@ -160,14 +166,14 @@ public abstract class AtlasHook {
      * @param messages   hook notification messages
      * @param maxRetries maximum number of retries while sending message to messaging system
      */
-    public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {
+    public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) {
         if (executor == null) { // send synchronously
-            notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
+            notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
         } else {
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
-                    notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
+                    notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
                 }
             });
         }
@@ -176,7 +182,7 @@ public abstract class AtlasHook {
     @VisibleForTesting
     static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, UserGroupInformation ugi,
                                        NotificationInterface notificationInterface,
-                                       boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
+                                       boolean shouldLogFailedMessages, FailedMessagesLogger logger, MessageSource source) {
         if (messages == null || messages.isEmpty()) {
             return;
         }
@@ -199,12 +205,12 @@ public abstract class AtlasHook {
 
             try {
                 if (ugi == null) {
-                    notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+                    notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages, source);
                 } else {
                     PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
                         @Override
                         public Object run() throws Exception {
-                            notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+                            notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages, source);
                             return messages;
                         }
                     };
@@ -244,7 +250,7 @@ public abstract class AtlasHook {
      * @param messages hook notification messages
      */
     protected void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi) {
-        notifyEntities(messages, ugi, notificationMaxRetries);
+        notifyEntities(messages, ugi, notificationMaxRetries, source);
     }
 
     /**
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index c45a1da..cca4cb8 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
 import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
 import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.model.notification.MessageVersion;
 import org.apache.commons.configuration.Configuration;
@@ -84,10 +85,15 @@ public abstract class AbstractNotification implements NotificationInterface {
 
     @Override
     public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        send(type, messages, new MessageSource());
+    }
+
+    @Override
+    public <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException {
         List<String> strMessages = new ArrayList<>(messages.size());
 
         for (int index = 0; index < messages.size(); index++) {
-            createNotificationMessages(messages.get(index), strMessages);
+            createNotificationMessages(messages.get(index), strMessages, source);
         }
 
         sendInternal(type, strMessages);
@@ -146,10 +152,11 @@ public abstract class AbstractNotification implements NotificationInterface {
      *
      * @param message  the message in object form
      *
+     * @param source
      * @return the message as a JSON string
      */
-    public static void createNotificationMessages(Object message, List<String> msgJsonList) {
-        AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
+    public static void createNotificationMessages(Object message, List<String> msgJsonList, MessageSource source) {
+        AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser(), false, source);
         String                      msgJson         = AtlasType.toV1Json(notificationMsg);
 
         boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 3d8d9cc..a9cd4a6 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.notification;
 
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.notification.entity.EntityMessageDeserializer;
 import org.apache.atlas.notification.hook.HookMessageDeserializer;
 
@@ -109,6 +110,8 @@ public interface NotificationInterface {
     /**
      * Shutdown any notification producers and consumers associated with this interface instance.
      */
+    <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException;
+
     void close();
 
     /**
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index 0c92c30..57c0c7d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -19,6 +19,7 @@ package org.apache.atlas.notification.spool;
 
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import static org.apache.atlas.repository.Constants.FILE_SPOOL_SOURCE;
 
 public class AtlasFileSpool implements NotificationInterface {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class);
@@ -103,13 +105,18 @@ public class AtlasFileSpool implements NotificationInterface {
     }
 
     @Override
+    public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        send(type, messages, new MessageSource(FILE_SPOOL_SOURCE));
+    }
+
+    @Override
     public boolean isReady(NotificationType type) {
         return true;
     }
 
     @Override
-    public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
-        List<String> serializedMessages = getSerializedMessages(messages);
+    public <T> void send(NotificationType type, List<T> messages, MessageSource source) throws NotificationException {
+        List<String> serializedMessages = getSerializedMessages(messages, source);
         if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("AtlasFileSpool.send(): sending to spooler");
@@ -139,10 +146,10 @@ public class AtlasFileSpool implements NotificationInterface {
         }
     }
 
-    private <T> List<String> getSerializedMessages(List<T> messages) {
+    private <T> List<String> getSerializedMessages(List<T> messages, MessageSource source) {
         List<String> serializedMessages = new ArrayList<>(messages.size());
         for (int index = 0; index < messages.size(); index++) {
-            notificationHandler.createNotificationMessages(messages.get(index), serializedMessages);
+            notificationHandler.createNotificationMessages(messages.get(index), serializedMessages, source);
         }
 
         return serializedMessages;
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index 1ae7c27..b094247 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.hook;
 
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
@@ -52,32 +53,35 @@ public class AtlasHookTest {
 
     @Test (timeOut = 10000)
     public void testNotifyEntitiesDoesNotHangOnException() throws Exception {
+        MessageSource source                     = new MessageSource(this.getClass().getSimpleName());
         List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new NotificationException(new Exception())).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
         AtlasHook.notifyEntitiesInternal(hookNotifications, 0, null, notificationInterface, false,
-                failedMessagesLogger);
+                failedMessagesLogger, source);
         // if we've reached here, the method finished OK.
     }
 
     @Test
     public void testNotifyEntitiesRetriesOnException() throws NotificationException {
+        MessageSource source                     = new MessageSource(this.getClass().getSimpleName());
         List<HookNotification> hookNotifications =
                 new ArrayList<HookNotification>() {{
                     add(new EntityCreateRequest("user"));
                 }
             };
         doThrow(new NotificationException(new Exception())).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
         AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false,
-                failedMessagesLogger);
+                failedMessagesLogger, source);
 
         verify(notificationInterface, times(2)).
-                send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+                send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
     }
 
     @Test
     public void testFailedMessageIsLoggedIfRequired() throws NotificationException {
+        MessageSource source                     = new MessageSource(this.getClass().getSimpleName());
         List<HookNotification> hookNotifications =
                 new ArrayList<HookNotification>() {{
                     add(new EntityCreateRequest("user"));
@@ -85,27 +89,29 @@ public class AtlasHookTest {
             };
         doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
         AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
-                failedMessagesLogger);
+                failedMessagesLogger, source);
 
         verify(failedMessagesLogger, times(1)).log("test message");
     }
 
     @Test
     public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException {
+        MessageSource source                     = new MessageSource(this.getClass().getSimpleName());
         List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
         AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false,
-                failedMessagesLogger);
+                failedMessagesLogger, source);
 
         verifyZeroInteractions(failedMessagesLogger);
     }
 
     @Test
     public void testAllFailedMessagesAreLogged() throws NotificationException {
+        MessageSource source                     = new MessageSource(this.getClass().getSimpleName());
         List<HookNotification> hookNotifications =
                 new ArrayList<HookNotification>() {{
                     add(new EntityCreateRequest("user"));
@@ -113,9 +119,9 @@ public class AtlasHookTest {
             };
         doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
         AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
-                failedMessagesLogger);
+                failedMessagesLogger, source);
 
         verify(failedMessagesLogger, times(1)).log("test message1");
         verify(failedMessagesLogger, times(1)).log("test message2");
@@ -123,11 +129,12 @@ public class AtlasHookTest {
 
     @Test
     public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception {
+        MessageSource source                     = new MessageSource(this.getClass().getSimpleName());
         List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new RuntimeException("test message")).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications, source);
         AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
-                failedMessagesLogger);
+                failedMessagesLogger, source);
 
         verifyZeroInteractions(failedMessagesLogger);
     }
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 8078a6c..4e1c094 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
 import org.apache.atlas.notification.NotificationInterface.NotificationType;
@@ -40,6 +41,7 @@ public class AbstractNotificationTest {
 
     @org.testng.annotations.Test
     public void testSend() throws Exception {
+        MessageSource    source        = new MessageSource();
         Configuration    configuration = mock(Configuration.class);
         TestNotification notification  = new TestNotification(configuration);
         Test             message1      = new Test(HookNotificationType.ENTITY_CREATE, "user1");
@@ -47,9 +49,9 @@ public class AbstractNotificationTest {
         Test             message3      = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1");
         List<String>     messageJson   = new ArrayList<>();
 
-        AbstractNotification.createNotificationMessages(message1, messageJson);
-        AbstractNotification.createNotificationMessages(message2, messageJson);
-        AbstractNotification.createNotificationMessages(message3, messageJson);
+        AbstractNotification.createNotificationMessages(message1, messageJson, source);
+        AbstractNotification.createNotificationMessages(message2, messageJson, source);
+        AbstractNotification.createNotificationMessages(message3, messageJson, source);
 
         notification.send(NotificationType.HOOK, message1, message2, message3);
 
@@ -63,6 +65,7 @@ public class AbstractNotificationTest {
 
     @org.testng.annotations.Test
     public void testSend2() throws Exception {
+        MessageSource    source        = new MessageSource();
         Configuration    configuration = mock(Configuration.class);
         TestNotification notification  = new TestNotification(configuration);
         Test             message1      = new Test(HookNotificationType.ENTITY_CREATE, "user1");
@@ -71,11 +74,11 @@ public class AbstractNotificationTest {
         List<Test>       messages      = Arrays.asList(message1, message2, message3);
         List<String>     messageJson   = new ArrayList<>();
 
-        AbstractNotification.createNotificationMessages(message1, messageJson);
-        AbstractNotification.createNotificationMessages(message2, messageJson);
-        AbstractNotification.createNotificationMessages(message3, messageJson);
+        AbstractNotification.createNotificationMessages(message1, messageJson, source);
+        AbstractNotification.createNotificationMessages(message2, messageJson, source);
+        AbstractNotification.createNotificationMessages(message3, messageJson, source);
 
-        notification.send(NotificationInterface.NotificationType.HOOK, messages);
+        notification.send(NotificationInterface.NotificationType.HOOK, messages, source);
 
         assertEquals(notification.type, NotificationType.HOOK);
         assertEquals(notification.messages.size(), messageJson.size());
diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
index 91a195d..463797c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
@@ -19,16 +19,26 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.model.notification.MessageVersion;
+import org.apache.atlas.notification.entity.EntityNotificationTest;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
 import org.testng.annotations.Test;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Collections;
+
 import static org.testng.Assert.*;
 
 /**
  * AtlasNotificationMessage tests.
  */
 public class AtlasNotificationMessageTest {
-
+    
     @Test
     public void testGetVersion() throws Exception {
         MessageVersion version = new MessageVersion("1.0.0");
@@ -56,4 +66,25 @@ public class AtlasNotificationMessageTest {
         assertTrue(atlasNotificationMessage.compareVersion(version2) < 0);
         assertTrue(atlasNotificationMessage.compareVersion(version3) > 0);
     }
+
+    @Test
+    public void testMessageSource() throws Exception {
+        Referenceable entity                           = generateEntityWithTrait();
+        HookNotificationV1.EntityUpdateRequest message = new HookNotificationV1.EntityUpdateRequest("user1", entity);
+        MessageSource source                           = new MessageSource(this.getClass().getSimpleName());
+        List<String> jsonList                          = new LinkedList<>();
+
+        AbstractNotification.createNotificationMessages(message, jsonList, source);
+        for(Object json :  jsonList) {
+            AtlasNotificationMessage atlasNotificationMessage = AtlasType.fromV1Json((String) json, AtlasNotificationMessage.class);
+            assertEquals("\"" + source.getSource() + "\"" ,AtlasType.toV1Json(atlasNotificationMessage.getSource().getSource()));
+        }
+    }
+
+    private Referenceable generateEntityWithTrait() {
+        Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+        return ret;
+    }
+
 }
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
index 13eafb6..2953b63 100644
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.notification.entity;
 
 import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.notification.AbstractNotification;
@@ -37,6 +38,7 @@ import static org.testng.Assert.assertTrue;
  */
 public class EntityNotificationDeserializerTest {
     private EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
+    MessageSource source = new MessageSource(this.getClass().getSimpleName());
 
     @Test
     public void testDeserialize() throws Exception {
@@ -46,7 +48,7 @@ public class EntityNotificationDeserializerTest {
         EntityNotificationV1 notification = new EntityNotificationV1(entity, EntityNotificationV1.OperationType.TRAIT_ADD, traits);
         List<String>         jsonMsgList  = new ArrayList<>();
 
-        AbstractNotification.createNotificationMessages(notification, jsonMsgList);
+        AbstractNotification.createNotificationMessages(notification, jsonMsgList, source);
 
         EntityNotification deserializedNotification = null;
 
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
index d048170..bfc9b53 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.notification.hook;
 
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.entity.EntityNotificationTest;
 import org.apache.atlas.v1.model.instance.Referenceable;
@@ -41,6 +42,7 @@ import static org.testng.Assert.assertTrue;
  */
 public class HookNotificationDeserializerTest {
     private HookMessageDeserializer deserializer = new HookMessageDeserializer();
+    MessageSource source = new MessageSource(this.getClass().getSimpleName());
 
     @Test
     public void testDeserialize() throws Exception {
@@ -48,7 +50,7 @@ public class HookNotificationDeserializerTest {
         EntityUpdateRequest message     = new EntityUpdateRequest("user1", entity);
         List<String>        jsonMsgList = new ArrayList<>();
 
-        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+        AbstractNotification.createNotificationMessages(message, jsonMsgList, source);
 
         HookNotification deserializedMessage = deserialize(jsonMsgList);
 
@@ -72,7 +74,7 @@ public class HookNotificationDeserializerTest {
         EntityUpdateRequest message    = new EntityUpdateRequest("user1", entity);
         List<String>       jsonMsgList = new ArrayList<>();
 
-        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+        AbstractNotification.createNotificationMessages(message, jsonMsgList, source);
 
         assertTrue(jsonMsgList.size() == 1);
 
@@ -92,7 +94,7 @@ public class HookNotificationDeserializerTest {
         EntityUpdateRequest message     = new EntityUpdateRequest("user1", entity);
         List<String>        jsonMsgList = new ArrayList<>();
 
-        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+        AbstractNotification.createNotificationMessages(message, jsonMsgList, source);
 
         assertTrue(jsonMsgList.size() > 1);