You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/09/30 11:57:18 UTC

atlas git commit: ATLAS-2075: notification enhancement to handle large messages, using compression and multi-part messages

Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 c61d489cc -> 99243ee8e


ATLAS-2075: notification enhancement to handle large messages, using compression and multi-part messages


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/99243ee8
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/99243ee8
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/99243ee8

Branch: refs/heads/branch-0.8
Commit: 99243ee8e18656acd72601468c99e7781a0b04f7
Parents: c61d489
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Wed Sep 27 20:42:25 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sat Sep 30 04:56:35 2017 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/AtlasConfiguration.java    |   7 +
 .../apache/atlas/kafka/AtlasKafkaConsumer.java  |   4 +
 .../apache/atlas/kafka/KafkaNotification.java   |   4 +-
 .../AbstractMessageDeserializer.java            |  14 +-
 .../notification/AbstractNotification.java      | 113 +++++++++-
 .../AtlasNotificationBaseMessage.java           | 194 ++++++++++++++++
 .../notification/AtlasNotificationMessage.java  |  50 +++++
 .../AtlasNotificationMessageDeserializer.java   | 225 +++++++++++++++++++
 .../AtlasNotificationStringMessage.java         |  60 +++++
 .../atlas/notification/MessageVersion.java      |   3 +
 .../notification/NotificationInterface.java     |   4 +-
 .../atlas/notification/VersionedMessage.java    |  75 -------
 .../VersionedMessageDeserializer.java           | 105 ---------
 .../apache/atlas/kafka/KafkaConsumerTest.java   |  11 +-
 .../atlas/kafka/KafkaNotificationMockTest.java  |  18 +-
 .../AbstractNotificationConsumerTest.java       |  41 ++--
 .../notification/AbstractNotificationTest.java  |  33 +--
 .../AtlasNotificationMessageTest.java           |  57 +++++
 .../notification/VersionedMessageTest.java      |  57 -----
 .../entity/EntityMessageDeserializerTest.java   |  16 +-
 .../hook/HookMessageDeserializerTest.java       | 134 +++++++++--
 21 files changed, 895 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 9a9bb76..451bd9d 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -33,6 +33,9 @@ public enum AtlasConfiguration {
 
     QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
 
+    NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
+    NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
+
     //search configuration
     SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
     SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
@@ -63,6 +66,10 @@ public enum AtlasConfiguration {
         return APPLICATION_PROPERTIES.getLong(propertyName, Long.valueOf(defaultValue.toString()).longValue());
     }
 
+    public boolean getBoolean() {
+        return APPLICATION_PROPERTIES.getBoolean(propertyName, Boolean.valueOf(defaultValue.toString()).booleanValue());
+    }
+
     public String getString() {
         return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString());
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index d3b4e49..e3bb71c 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -71,6 +71,10 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
 
                 T message = deserializer.deserialize(record.value().toString());
 
+                if (message == null) {
+                    continue;
+                }
+
                 messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition()));
             }
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 38889ef..6bb8d73 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -202,7 +202,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
     // ----- AbstractNotification --------------------------------------------
 
     @Override
-    public void sendInternal(NotificationType type, String... messages) throws NotificationException {
+    public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
         if (producer == null) {
             createProducer();
         }
@@ -210,7 +210,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
     }
 
     @VisibleForTesting
-    void sendInternalToProducer(Producer p, NotificationType type, String[] messages) throws NotificationException {
+    void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException {
         String topic = TOPIC_MAP.get(type);
         List<MessageContext> messageContexts = new ArrayList<>();
         for (String message : messages) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
index ec99372..37a57d1 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
@@ -44,7 +44,7 @@ import java.util.Map;
 /**
  * Base notification message deserializer.
  */
-public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> {
+public abstract class AbstractMessageDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
 
     private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>();
 
@@ -63,16 +63,16 @@ public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDes
     /**
      * Create a deserializer.
      *
-     * @param versionedMessageType  the type of the versioned message
-     * @param expectedVersion       the expected message version
-     * @param deserializerMap       map of individual deserializers used to define this message deserializer
-     * @param notificationLogger    logger for message version mismatch
+     * @param notificationMessageType the type of the notification message
+     * @param expectedVersion         the expected message version
+     * @param deserializerMap         map of individual deserializers used to define this message deserializer
+     * @param notificationLogger      logger for message version mismatch
      */
-    public AbstractMessageDeserializer(Type versionedMessageType,
+    public AbstractMessageDeserializer(Type notificationMessageType,
                                        MessageVersion expectedVersion,
                                        Map<Type, JsonDeserializer> deserializerMap,
                                        Logger notificationLogger) {
-        super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
+        super(notificationMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
     }
 
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index cb44fc6..1f9404d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -26,21 +26,34 @@ import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.configuration.Configuration;
 import org.codehaus.jettison.json.JSONArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import java.lang.reflect.Type;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
+import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
 
 /**
  * Abstract notification interface implementation.
  */
 public abstract class AbstractNotification implements NotificationInterface {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNotification.class);
+
+    private static String        msgIdPrefix = UUID.randomUUID().toString();
+    private static AtomicInteger msgIdSuffix = new AtomicInteger(0);
 
     /**
      * The current expected version for notification messages.
@@ -48,6 +61,9 @@ public abstract class AbstractNotification implements NotificationInterface {
     public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
 
     public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
+
+    public static final int MAX_BYTES_PER_CHAR = 4;  // each char can encode upto 4 bytes in UTF-8
+
     private final boolean embedded;
     private final boolean isHAEnabled;
 
@@ -77,10 +93,12 @@ public abstract class AbstractNotification implements NotificationInterface {
 
     @Override
     public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
-        String[] strMessages = new String[messages.size()];
+        List<String> strMessages = new ArrayList<>(messages.size());
+
         for (int index = 0; index < messages.size(); index++) {
-            strMessages[index] = getMessageJson(messages.get(index));
+            createNotificationMessages(messages.get(index), strMessages);
         }
+
         sendInternal(type, strMessages);
     }
 
@@ -117,11 +135,17 @@ public abstract class AbstractNotification implements NotificationInterface {
      *
      * @throws NotificationException if an error occurs while sending
      */
-    protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
+    protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
 
 
     // ----- utility methods -------------------------------------------------
 
+    public static String getMessageJson(Object message) {
+        AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
+
+        return GSON.toJson(notificationMsg);
+    }
+
     /**
      * Get the notification message JSON from the given object.
      *
@@ -129,10 +153,75 @@ public abstract class AbstractNotification implements NotificationInterface {
      *
      * @return the message as a JSON string
      */
-    public static String getMessageJson(Object message) {
-        VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message);
+    public static void createNotificationMessages(Object message, List<String> msgJsonList) {
+        AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
+        String                      msgJson         = GSON.toJson(notificationMsg);
+
+        boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
+
+        if (msgLengthExceedsLimit) { // get utf-8 bytes for msgJson and check for length limit again
+            byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
+
+            msgLengthExceedsLimit = msgBytes.length > MESSAGE_MAX_LENGTH_BYTES;
+
+            if (msgLengthExceedsLimit) {
+                String          msgId           = getNextMessageId();
+                CompressionKind compressionKind = CompressionKind.NONE;
+
+                if (MESSAGE_COMPRESSION_ENABLED) {
+                    byte[] encodedBytes = AtlasNotificationBaseMessage.gzipCompressAndEncodeBase64(msgBytes);
+
+                    compressionKind = CompressionKind.GZIP;
+
+                    LOG.info("Compressed large message: msgID={}, uncompressed={} bytes, compressed={} bytes", msgId, msgBytes.length, encodedBytes.length);
+
+                    msgLengthExceedsLimit = encodedBytes.length > MESSAGE_MAX_LENGTH_BYTES;
 
-        return GSON.toJson(versionedMessage);
+                    if (!msgLengthExceedsLimit) { // no need to split
+                        AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind);
+
+                        msgJson  = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
+                        msgBytes = null; // not used after this point
+                    } else { // encodedBytes will be split
+                        msgJson  = null; // not used after this point
+                        msgBytes = encodedBytes;
+                    }
+                }
+
+                if (msgLengthExceedsLimit) {
+                    // compressed messages are already base64-encoded
+                    byte[] encodedBytes = compressionKind != CompressionKind.NONE ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes);
+
+                    int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES;
+
+                    if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) {
+                        splitCount++;
+                    }
+
+                    LOG.info("Splitting large message: msgID={}, length={} bytes, splitCount={}", msgId, encodedBytes.length, splitCount);
+
+                    for (int i = 0, offset = 0; i < splitCount; i++) {
+                        int length = MESSAGE_MAX_LENGTH_BYTES;
+
+                        if ((offset + length) > encodedBytes.length) {
+                            length = encodedBytes.length - offset;
+                        }
+
+                        AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount);
+
+                        String splitMsgJson = GSON.toJson(splitMsg);
+
+                        msgJsonList.add(splitMsgJson);
+
+                        offset += length;
+                    }
+                }
+            }
+        }
+
+        if (!msgLengthExceedsLimit) {
+            msgJsonList.add(msgJson);
+        }
     }
 
 
@@ -158,4 +247,16 @@ public abstract class AbstractNotification implements NotificationInterface {
             return new JsonParser().parse(src.toString()).getAsJsonArray();
         }
     }
+
+    private static String getNextMessageId() {
+        String nextMsgIdPrefix = msgIdPrefix;
+        int    nextMsgIdSuffix = msgIdSuffix.getAndIncrement();
+
+        if (nextMsgIdSuffix == Short.MAX_VALUE) { // get a new UUID after 32,767 IDs
+            msgIdPrefix = UUID.randomUUID().toString();
+            msgIdSuffix = new AtomicInteger(0);
+        }
+
+        return nextMsgIdPrefix + "_" + Integer.toString(nextMsgIdSuffix);
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
new file mode 100644
index 0000000..3b377de
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.commons.compress.utils.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+
+public class AtlasNotificationBaseMessage {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class);
+
+    public static final int     MESSAGE_MAX_LENGTH_BYTES    = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop;
+    public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean();
+
+    public enum CompressionKind { NONE, GZIP };
+
+    private MessageVersion  version            = null;
+    private String          msgId              = null;
+    private CompressionKind msgCompressionKind = CompressionKind.NONE;
+    private int             msgSplitIdx        = 1;
+    private int             msgSplitCount      = 1;
+
+
+    public AtlasNotificationBaseMessage() {
+    }
+
+    public AtlasNotificationBaseMessage(MessageVersion version) {
+        this(version, null, CompressionKind.NONE);
+    }
+
+    public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
+        this.version            = version;
+        this.msgId              = msgId;
+        this.msgCompressionKind = msgCompressionKind;
+    }
+
+    public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
+        this.version            = version;
+        this.msgId              = msgId;
+        this.msgCompressionKind = msgCompressionKind;
+        this.msgSplitIdx        = msgSplitIdx;
+        this.msgSplitCount      = msgSplitCount;
+    }
+
+    public void setVersion(MessageVersion version) {
+        this.version = version;
+    }
+
+    public MessageVersion getVersion() {
+        return version;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public CompressionKind getMsgCompressionKind() {
+        return msgCompressionKind;
+    }
+
+    public void setMsgCompressed(CompressionKind msgCompressionKind) {
+        this.msgCompressionKind = msgCompressionKind;
+    }
+
+    public int getMsgSplitIdx() {
+        return msgSplitIdx;
+    }
+
+    public void setMsgSplitIdx(int msgSplitIdx) {
+        this.msgSplitIdx = msgSplitIdx;
+    }
+
+    public int getMsgSplitCount() {
+        return msgSplitCount;
+    }
+
+    public void setMsgSplitCount(int msgSplitCount) {
+        this.msgSplitCount = msgSplitCount;
+    }
+
+    /**
+     * Compare the version of this message with the given version.
+     *
+     * @param compareToVersion  the version to compare to
+     *
+     * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
+     *         or greater than the given version.
+     */
+    public int compareVersion(MessageVersion compareToVersion) {
+        return version.compareTo(compareToVersion);
+    }
+
+
+    public static byte[] getBytesUtf8(String str) {
+        return StringUtils.getBytesUtf8(str);
+    }
+
+    public static String getStringUtf8(byte[] bytes) {
+        return StringUtils.newStringUtf8(bytes);
+    }
+
+    public static byte[] encodeBase64(byte[] bytes) {
+        return Base64.encodeBase64(bytes);
+    }
+
+    public static byte[] decodeBase64(byte[] bytes) {
+        return Base64.decodeBase64(bytes);
+    }
+
+    public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) {
+        return encodeBase64(gzipCompress(bytes));
+    }
+
+    public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) {
+        return gzipUncompress(decodeBase64(bytes));
+    }
+
+    public static String gzipCompress(String str) {
+        byte[] bytes           = getBytesUtf8(str);
+        byte[] compressedBytes = gzipCompress(bytes);
+        byte[] encodedBytes    = encodeBase64(compressedBytes);
+
+        return getStringUtf8(encodedBytes);
+    }
+
+    public static String gzipUncompress(String str) {
+        byte[] encodedBytes    = getBytesUtf8(str);
+        byte[] compressedBytes = decodeBase64(encodedBytes);
+        byte[] bytes           = gzipUncompress(compressedBytes);
+
+        return getStringUtf8(bytes);
+    }
+
+    public static byte[] gzipCompress(byte[] content) {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+
+        try {
+            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
+
+            gzipOutputStream.write(content);
+            gzipOutputStream.close();
+        } catch (IOException e) {
+            LOG.error("gzipCompress(): error compressing {} bytes", content.length, e);
+
+            throw new RuntimeException(e);
+        }
+
+        return byteArrayOutputStream.toByteArray();
+    }
+
+    public static byte[] gzipUncompress(byte[] content) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        try {
+            IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out);
+        } catch (IOException e) {
+            LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e);
+        }
+
+        return out.toByteArray();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
new file mode 100644
index 0000000..2f6f9c7
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Represents a notification message that is associated with a version.
+ */
+public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
+
+    /**
+     * The actual message.
+     */
+    private final T message;
+
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a notification message.
+     *
+     * @param version  the message version
+     * @param message  the actual message
+     */
+    public AtlasNotificationMessage(MessageVersion version, T message) {
+        super(version);
+
+        this.message = message;
+    }
+
+
+    public T getMessage() {
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
new file mode 100644
index 0000000..b1ac2fa
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.gson.Gson;
+import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Deserializer that works with notification messages.  The version of each deserialized message is checked against an
+ * expected version.
+ */
+public abstract class AtlasNotificationMessageDeserializer<T> implements MessageDeserializer<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationMessageDeserializer.class);
+
+
+    public static final String VERSION_MISMATCH_MSG =
+        "Notification message version mismatch. Expected %s but recieved %s. Message %s";
+
+    private final Type notificationMessageType;
+    private final Type messageType;
+    private final MessageVersion expectedVersion;
+    private final Logger notificationLogger;
+    private final Gson gson;
+
+
+    private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>();
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a notification message deserializer.
+     *
+     * @param notificationMessageType the type of the notification message
+     * @param expectedVersion         the expected message version
+     * @param gson                    JSON serialization/deserialization
+     * @param notificationLogger      logger for message version mismatch
+     */
+    public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
+                                                Gson gson, Logger notificationLogger) {
+        this.notificationMessageType = notificationMessageType;
+        this.messageType             = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
+        this.expectedVersion         = expectedVersion;
+        this.gson                    = gson;
+        this.notificationLogger      = notificationLogger;
+    }
+
+    // ----- MessageDeserializer ---------------------------------------------
+
+    @Override
+    public T deserialize(String messageJson) {
+        final T ret;
+
+        AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
+
+        if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
+            ret = gson.fromJson(messageJson, messageType);
+        } else  {
+            String msgJson = messageJson;
+
+            if (msg.getMsgSplitCount() > 1) { // multi-part message
+                AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+
+                checkVersion(splitMsg, msgJson);
+
+                String msgId = splitMsg.getMsgId();
+
+                if (StringUtils.isEmpty(msgId)) {
+                    LOG.error("Received multi-part message with no message ID. Ignoring message");
+
+                    msg = null;
+                } else {
+                    final int splitIdx   = splitMsg.getMsgSplitIdx();
+                    final int splitCount = splitMsg.getMsgSplitCount();
+
+                    final AtlasNotificationStringMessage[] splitMsgs;
+
+                    if (splitIdx == 0) {
+                        splitMsgs = new AtlasNotificationStringMessage[splitCount];
+
+                        splitMsgBuffer.put(msgId, splitMsgs);
+                    } else {
+                        splitMsgs = splitMsgBuffer.get(msgId);
+                    }
+
+                    if (splitMsgs == null) {
+                        LOG.error("Received multi-part message: msgID={}, {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
+
+                        msg = null;
+                    } else if (splitMsgs.length <= splitIdx) {
+                        LOG.error("Received multi-part message: msgID={}, {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
+
+                        msg = null;
+                    } else {
+                        LOG.info("Received multi-part message: msgID={}, {} of {}", msgId, splitIdx + 1, splitCount);
+
+                        splitMsgs[splitIdx] = splitMsg;
+
+                        if (splitIdx == (splitCount - 1)) { // last message
+                            splitMsgBuffer.remove(msgId);
+
+                            boolean isValidMessage = true;
+
+                            StringBuilder sb = new StringBuilder();
+
+                            for (int i = 0; i < splitMsgs.length; i++) {
+                                splitMsg = splitMsgs[i];
+
+                                if (splitMsg == null) {
+                                    LOG.warn("Multi-part message: msgID={}, message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
+
+                                    isValidMessage = false;
+
+                                    break;
+                                }
+
+                                sb.append(splitMsg.getMessage());
+                            }
+
+                            if (isValidMessage) {
+                                msgJson = sb.toString();
+
+                                if (CompressionKind.GZIP.equals(splitMsg.getMsgCompressionKind())) {
+                                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
+                                    byte[] bytes        = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
+
+                                    msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
+
+                                    LOG.info("Received multi-part, compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
+                                } else {
+                                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
+                                    byte[] bytes        = AtlasNotificationBaseMessage.decodeBase64(encodedBytes);
+
+                                    msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
+
+                                    LOG.info("Received multi-part message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
+                                }
+
+                                msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class);
+                            } else {
+                                msg = null;
+                            }
+                        } else { // more messages to arrive
+                            msg = null;
+                        }
+                    }
+                }
+            }
+
+            if (msg != null) {
+                if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) {
+                    AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+
+                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage());
+                    byte[] bytes        = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
+
+                    msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
+
+                    LOG.info("Received compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
+                }
+
+                AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType);
+
+                checkVersion(atlasNotificationMessage, msgJson);
+
+                ret = atlasNotificationMessage.getMessage();
+            } else {
+                ret = null;
+            }
+        }
+
+        return ret;
+    }
+
+    // ----- helper methods --------------------------------------------------
+
+    /**
+     * Check the message version against the expected version.
+     *
+     * @param notificationMessage the notification message
+     * @param messageJson         the notification message json
+     *
+     * @throws IncompatibleVersionException  if the message version is incompatable with the expected version
+     */
+    protected void checkVersion(AtlasNotificationBaseMessage notificationMessage, String messageJson) {
+        int comp = notificationMessage.compareVersion(expectedVersion);
+
+        // message has newer version
+        if (comp > 0) {
+            String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson);
+
+            notificationLogger.error(msg);
+
+            throw new IncompatibleVersionException(msg);
+        }
+
+        // message has older version
+        if (comp < 0) {
+            notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
new file mode 100644
index 0000000..193735c
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+
+public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage {
+    private String message = null;
+
+    public AtlasNotificationStringMessage() {
+        super(AbstractNotification.CURRENT_MESSAGE_VERSION);
+    }
+
+    public AtlasNotificationStringMessage(String message) {
+        super(AbstractNotification.CURRENT_MESSAGE_VERSION);
+
+        this.message = message;
+    }
+
+    public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) {
+        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
+
+        this.message = message;
+    }
+
+    public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
+        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
+
+        this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes);
+    }
+
+    public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
+        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
+
+        this.message = new String(encodedBytes, offset, length);
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
index 6ef407a..7f96638 100644
--- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
+++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
@@ -29,6 +29,9 @@ public class MessageVersion implements Comparable<MessageVersion> {
      * Used for message with no version (old format).
      */
     public static final MessageVersion NO_VERSION = new MessageVersion("0");
+    public static final MessageVersion VERSION_1  = new MessageVersion("1.0.0");
+
+    public static final MessageVersion CURRENT_VERSION = VERSION_1;
 
     private final String version;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 956c85e..a787862 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -54,9 +54,9 @@ public interface NotificationInterface {
      * Versioned notification message class types.
      */
     Type HOOK_VERSIONED_MESSAGE_TYPE =
-        new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType();
+        new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType();
 
-    Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType();
+    Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType();
 
     /**
      * Atlas notification types.

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
deleted file mode 100644
index 1929eb4..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-/**
- * Represents a notification message that is associated with a version.
- */
-public class VersionedMessage<T> {
-
-    /**
-     * The version of the message.
-     */
-    private final MessageVersion version;
-
-    /**
-     * The actual message.
-     */
-    private final T message;
-
-
-    // ----- Constructors ----------------------------------------------------
-
-    /**
-     * Create a versioned message.
-     *
-     * @param version  the message version
-     * @param message  the actual message
-     */
-    public VersionedMessage(MessageVersion version, T message) {
-        this.version = version;
-        this.message = message;
-    }
-
-
-    // ----- VersionedMessage ------------------------------------------------
-
-    /**
-     * Compare the version of this message with the given version.
-     *
-     * @param compareToVersion  the version to compare to
-     *
-     * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
-     *         or greater than the given version.
-     */
-    public int compareVersion(MessageVersion compareToVersion) {
-        return version.compareTo(compareToVersion);
-    }
-
-
-    // ----- accessors -------------------------------------------------------
-
-    public MessageVersion getVersion() {
-        return version;
-    }
-
-    public T getMessage() {
-        return message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
deleted file mode 100644
index cc2099e..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import com.google.gson.Gson;
-import org.slf4j.Logger;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-/**
- * Deserializer that works with versioned messages.  The version of each deserialized message is checked against an
- * expected version.
- */
-public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
-
-    public static final String VERSION_MISMATCH_MSG =
-        "Notification message version mismatch. Expected %s but recieved %s. Message %s";
-
-    private final Type versionedMessageType;
-    private final MessageVersion expectedVersion;
-    private final Logger notificationLogger;
-    private final Gson gson;
-
-
-    // ----- Constructors ----------------------------------------------------
-
-    /**
-     * Create a versioned message deserializer.
-     *
-     * @param versionedMessageType  the type of the versioned message
-     * @param expectedVersion       the expected message version
-     * @param gson                  JSON serialization/deserialization
-     * @param notificationLogger    logger for message version mismatch
-     */
-    public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion,
-                                        Gson gson, Logger notificationLogger) {
-        this.versionedMessageType = versionedMessageType;
-        this.expectedVersion = expectedVersion;
-        this.gson = gson;
-        this.notificationLogger = notificationLogger;
-    }
-
-
-    // ----- MessageDeserializer ---------------------------------------------
-
-    @Override
-    public T deserialize(String messageJson) {
-        VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType);
-
-        // older style messages not wrapped with VersionedMessage
-        if (versionedMessage.getVersion() == null) {
-            Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0];
-            versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t));
-        }
-        checkVersion(versionedMessage, messageJson);
-
-        return versionedMessage.getMessage();
-    }
-
-
-    // ----- helper methods --------------------------------------------------
-
-    /**
-     * Check the message version against the expected version.
-     *
-     * @param versionedMessage  the versioned message
-     * @param messageJson       the notification message json
-     *
-     * @throws IncompatibleVersionException  if the message version is incompatable with the expected version
-     */
-    protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) {
-        int comp = versionedMessage.compareVersion(expectedVersion);
-
-        // message has newer version
-        if (comp > 0) {
-            String msg =
-                    String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), messageJson);
-            notificationLogger.error(msg);
-            throw new IncompatibleVersionException(msg);
-        }
-
-        // message has older version
-        if (comp < 0) {
-            notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(),
-                    messageJson));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 9b712f4..08a20bd 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -19,11 +19,8 @@
 package org.apache.atlas.kafka;
 
 import kafka.message.MessageAndMetadata;
-import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.MessageVersion;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.IncompatibleVersionException;
-import org.apache.atlas.notification.VersionedMessage;
+import org.apache.atlas.notification.*;
+import org.apache.atlas.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.entity.EntityNotificationImplTest;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.IStruct;
@@ -82,7 +79,7 @@ public class KafkaConsumerTest {
         HookNotification.EntityUpdateRequest message =
             new HookNotification.EntityUpdateRequest("user1", entity);
 
-        String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
+        String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
 
         kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
         List<ConsumerRecord> klist = new ArrayList<>();
@@ -119,7 +116,7 @@ public class KafkaConsumerTest {
         HookNotification.EntityUpdateRequest message =
             new HookNotification.EntityUpdateRequest("user1", entity);
 
-        String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
+        String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
 
         kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
         List<ConsumerRecord> klist = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index b7474a0..09e2e43 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -17,20 +17,14 @@
  */
 package org.apache.atlas.kafka;
 
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
-import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.testng.annotations.Test;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -38,11 +32,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import org.apache.atlas.kafka.AtlasKafkaConsumer;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import scala.actors.threadpool.Arrays;
+
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -90,7 +82,7 @@ public class KafkaNotificationMockTest {
         when(producer.send(expectedRecord)).thenReturn(returnValue);
 
         kafkaNotification.sendInternalToProducer(producer,
-                NotificationInterface.NotificationType.HOOK, new String[]{message});
+                NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
 
         verify(producer).send(expectedRecord);
     }
@@ -112,7 +104,7 @@ public class KafkaNotificationMockTest {
 
         try {
             kafkaNotification.sendInternalToProducer(producer,
-                NotificationInterface.NotificationType.HOOK, new String[]{message});
+                NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
             fail("Should have thrown NotificationException");
         } catch (NotificationException e) {
             assertEquals(e.getFailedMessages().size(), 1);
@@ -142,7 +134,7 @@ public class KafkaNotificationMockTest {
 
         try {
             kafkaNotification.sendInternalToProducer(producer,
-                    NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
+                    NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message1, message2}));
             fail("Should have thrown NotificationException");
         } catch (NotificationException e) {
             assertEquals(e.getFailedMessages().size(), 2);

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 3b2a093..12f48d1 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -30,7 +30,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 
-import static org.mockito.Matchers.endsWith;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -57,15 +56,15 @@ public class AbstractNotificationConsumerTest {
 
         List jsonList = new LinkedList<>();
 
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
+        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
 
-        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+        Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
 
         NotificationConsumer<TestMessage> consumer =
-                new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+                new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
 
         List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
@@ -91,9 +90,9 @@ public class AbstractNotificationConsumerTest {
 
         List jsonList = new LinkedList<>();
 
-        String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
-        String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
-        String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3));
+        String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+        String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
+        String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
         String json4 = GSON.toJson(testMessage4);
 
         jsonList.add(json1);
@@ -101,10 +100,10 @@ public class AbstractNotificationConsumerTest {
         jsonList.add(json3);
         jsonList.add(json4);
 
-        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+        Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
 
         NotificationConsumer<TestMessage> consumer =
-            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+            new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
 
         List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
@@ -127,16 +126,16 @@ public class AbstractNotificationConsumerTest {
 
         List jsonList = new LinkedList<>();
 
-        String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
-        String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
+        String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+        String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
 
         jsonList.add(json1);
         jsonList.add(json2);
 
-        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+        Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
 
         NotificationConsumer<TestMessage> consumer =
-            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+            new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
         try {
             List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
@@ -187,8 +186,8 @@ public class AbstractNotificationConsumerTest {
         private final List<T> messageList;
         private int index = 0;
 
-        public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) {
-            super(new TestDeserializer<T>(versionedMessageType, logger));
+        public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) {
+            super(new TestDeserializer<T>(notificationMessageType, logger));
             this.messageList = messages;
         }
 
@@ -222,10 +221,10 @@ public class AbstractNotificationConsumerTest {
         }
     }
 
-    private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
+    private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
 
-        private TestDeserializer(Type versionedMessageType, Logger logger) {
-            super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
+        private TestDeserializer(Type notificationMessageType, Logger logger) {
+            super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 61107a9..4719324 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.commons.configuration.Configuration;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -44,17 +45,18 @@ public class AbstractNotificationTest {
         TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
         TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
 
-        String messageJson1 = AbstractNotification.getMessageJson(message1);
-        String messageJson2 = AbstractNotification.getMessageJson(message2);
-        String messageJson3 = AbstractNotification.getMessageJson(message3);
+        List<String> messageJson = new ArrayList<>();
+        AbstractNotification.createNotificationMessages(message1, messageJson);
+        AbstractNotification.createNotificationMessages(message2, messageJson);
+        AbstractNotification.createNotificationMessages(message3, messageJson);
 
         notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
 
         assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
-        assertEquals(3, notification.messages.length);
-        assertEquals(messageJson1, notification.messages[0]);
-        assertEquals(messageJson2, notification.messages[1]);
-        assertEquals(messageJson3, notification.messages[2]);
+        assertEquals(3, notification.messages.size());
+        assertEquals(messageJson.get(0), notification.messages.get(0));
+        assertEquals(messageJson.get(1), notification.messages.get(1));
+        assertEquals(messageJson.get(2), notification.messages.get(2));
     }
 
     @Test
@@ -72,17 +74,16 @@ public class AbstractNotificationTest {
         messages.add(message2);
         messages.add(message3);
 
-        String messageJson1 = AbstractNotification.getMessageJson(message1);
-        String messageJson2 = AbstractNotification.getMessageJson(message2);
-        String messageJson3 = AbstractNotification.getMessageJson(message3);
+        List<String> messageJson = new ArrayList<>();
+        AbstractNotification.createNotificationMessages(message1, messageJson);
+        AbstractNotification.createNotificationMessages(message2, messageJson);
+        AbstractNotification.createNotificationMessages(message3, messageJson);
 
         notification.send(NotificationInterface.NotificationType.HOOK, messages);
 
         assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
-        assertEquals(3, notification.messages.length);
-        assertEquals(messageJson1, notification.messages[0]);
-        assertEquals(messageJson2, notification.messages[1]);
-        assertEquals(messageJson3, notification.messages[2]);
+        assertEquals(messageJson.size(), notification.messages.size());
+        assertEquals(messageJson, notification.messages);
     }
 
     public static class TestMessage extends HookNotification.HookNotificationMessage {
@@ -94,14 +95,14 @@ public class AbstractNotificationTest {
 
     public static class TestNotification extends AbstractNotification {
         private NotificationType type;
-        private String[] messages;
+        private List<String>     messages;
 
         public TestNotification(Configuration applicationProperties) throws AtlasException {
             super(applicationProperties);
         }
 
         @Override
-        protected void sendInternal(NotificationType notificationType, String[] notificationMessages)
+        protected void sendInternal(NotificationType notificationType, List<String> notificationMessages)
             throws NotificationException {
 
             type = notificationType;

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
new file mode 100644
index 0000000..27b5034
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+/**
+ * AtlasNotificationMessage tests.
+ */
+public class AtlasNotificationMessageTest {
+
+    @Test
+    public void testGetVersion() throws Exception {
+        MessageVersion version = new MessageVersion("1.0.0");
+        AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, "a");
+        assertEquals(atlasNotificationMessage.getVersion(), version);
+    }
+
+    @Test
+    public void testGetMessage() throws Exception {
+        String message = "a";
+        MessageVersion version = new MessageVersion("1.0.0");
+        AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, message);
+        assertEquals(atlasNotificationMessage.getMessage(), message);
+    }
+
+    @Test
+    public void testCompareVersion() throws Exception {
+        MessageVersion version1 = new MessageVersion("1.0.0");
+        MessageVersion version2 = new MessageVersion("2.0.0");
+        MessageVersion version3 = new MessageVersion("0.5.0");
+
+        AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version1, "a");
+
+        assertTrue(atlasNotificationMessage.compareVersion(version1) == 0);
+        assertTrue(atlasNotificationMessage.compareVersion(version2) < 0);
+        assertTrue(atlasNotificationMessage.compareVersion(version3) > 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
deleted file mode 100644
index 587b7eb..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.*;
-
-/**
- * VersionedMessage tests.
- */
-public class VersionedMessageTest {
-
-    @Test
-    public void testGetVersion() throws Exception {
-        MessageVersion version = new MessageVersion("1.0.0");
-        VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a");
-        assertEquals(versionedMessage.getVersion(), version);
-    }
-
-    @Test
-    public void testGetMessage() throws Exception {
-        String message = "a";
-        MessageVersion version = new MessageVersion("1.0.0");
-        VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message);
-        assertEquals(versionedMessage.getMessage(), message);
-    }
-
-    @Test
-    public void testCompareVersion() throws Exception {
-        MessageVersion version1 = new MessageVersion("1.0.0");
-        MessageVersion version2 = new MessageVersion("2.0.0");
-        MessageVersion version3 = new MessageVersion("0.5.0");
-
-        VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a");
-
-        assertTrue(versionedMessage.compareVersion(version1) == 0);
-        assertTrue(versionedMessage.compareVersion(version2) < 0);
-        assertTrue(versionedMessage.compareVersion(version3) > 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
index be32427..7b513da 100644
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
@@ -24,6 +24,7 @@ import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -48,9 +49,20 @@ public class EntityMessageDeserializerTest {
         EntityNotificationImpl notification =
             new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
 
-        String json = AbstractNotification.getMessageJson(notification);
+        List<String> jsonMsgList = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(notification, jsonMsgList);
+
+        EntityNotification deserializedNotification = null;
+
+        for (String jsonMsg : jsonMsgList) {
+            deserializedNotification = deserializer.deserialize(jsonMsg);
+
+            if (deserializedNotification != null) {
+                break;
+            }
+        }
 
-        EntityNotification deserializedNotification = deserializer.deserialize(json);
         assertEquals(deserializedNotification.getOperationType(), notification.getOperationType());
         assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId());
         assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName());

http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
index 3724fd5..49b877b 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
@@ -20,51 +20,151 @@ package org.apache.atlas.notification.hook;
 
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.entity.EntityNotificationImplTest;
-import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 /**
  * HookMessageDeserializer tests.
  */
 public class HookMessageDeserializerTest {
+    HookMessageDeserializer deserializer = new HookMessageDeserializer();
+
     @Test
     public void testDeserialize() throws Exception {
-        HookMessageDeserializer deserializer = new HookMessageDeserializer();
+        Referenceable       entity  = generateEntityWithTrait();
+        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+        List<String> jsonMsgList = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+        HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
+
+        assertEqualMessage(deserializedMessage, message);
+    }
+
+    // validate deserialization of legacy message, which doesn't use MessageVersion
+    @Test
+    public void testDeserializeLegacyMessage() throws Exception {
+        Referenceable       entity  = generateEntityWithTrait();
+        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+        String                  jsonMsg             = AbstractNotification.GSON.toJson(message);
+        HookNotificationMessage deserializedMessage = deserializer.deserialize(jsonMsg);
+
+        assertEqualMessage(deserializedMessage, message);
+    }
+
+    @Test
+    public void testDeserializeCompressedMessage() throws Exception {
+        Referenceable       entity  = generateLargeEntityWithTrait();
+        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+        List<String> jsonMsgList = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+        assertTrue(jsonMsgList.size() == 1);
+
+        String compressedMsg   = jsonMsgList.get(0);
+        String uncompressedMsg = AbstractNotification.GSON.toJson(message);
+
+        assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")");
+
+        HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
+
+        assertEqualMessage(deserializedMessage, message);
+    }
 
-        Referenceable entity = EntityNotificationImplTest.getEntity("id");
-        String traitName = "MyTrait";
-        List<IStruct> traitInfo = new LinkedList<>();
-        IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
-        traitInfo.add(trait);
+    @Test
+    public void testDeserializeSplitMessage() throws Exception {
+        Referenceable       entity  = generateVeryLargeEntityWithTrait();
+        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+        List<String> jsonMsgList = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(message, jsonMsgList);
 
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
+        assertTrue(jsonMsgList.size() > 1);
 
-        String json = AbstractNotification.getMessageJson(message);
+        HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
 
-        HookNotification.HookNotificationMessage deserializedMessage = deserializer.deserialize(json);
+        assertEqualMessage(deserializedMessage, message);
+    }
+
+    private Referenceable generateEntityWithTrait() {
+        Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
 
+        return ret;
+    }
+
+    private HookNotificationMessage deserialize(List<String> jsonMsgList) {
+        HookNotificationMessage deserializedMessage = null;
+
+        for (String jsonMsg : jsonMsgList) {
+            deserializedMessage = deserializer.deserialize(jsonMsg);
+
+            if (deserializedMessage != null) {
+                break;
+            }
+        }
+
+        return deserializedMessage;
+    }
+
+    private void assertEqualMessage(HookNotificationMessage deserializedMessage, EntityUpdateRequest message) throws Exception {
+        assertNotNull(deserializedMessage);
         assertEquals(deserializedMessage.getType(), message.getType());
         assertEquals(deserializedMessage.getUser(), message.getUser());
 
-        assertTrue(deserializedMessage instanceof HookNotification.EntityUpdateRequest);
+        assertTrue(deserializedMessage instanceof EntityUpdateRequest);
 
-        HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
-            (HookNotification.EntityUpdateRequest) deserializedMessage;
+        EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage;
+        Referenceable       deserializedEntity              = deserializedEntityUpdateRequest.getEntities().get(0);
+        Referenceable       entity                          = message.getEntities().get(0);
+        String              traitName                       = entity.getTraits().get(0);
 
-        Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
         assertEquals(deserializedEntity.getId(), entity.getId());
         assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
         assertEquals(deserializedEntity.getTraits(), entity.getTraits());
-        assertEquals(deserializedEntity.getTrait(traitName), entity.getTrait(traitName));
+        assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode());
+
+    }
+
+    private Referenceable generateLargeEntityWithTrait() {
+        Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+        // add 100 attributes, each with value of size 10k
+        // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split
+        String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression
+        for (int i = 0; i < 100; i++) {
+            ret.set("attr_" + i, attrValue);
+        }
+
+        return ret;
+    }
+
+    private Referenceable generateVeryLargeEntityWithTrait() {
+        Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+        // add 300 attributes, each with value of size 10k
+        // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split
+        for (int i = 0; i < 300; i++) {
+            ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024));
+        }
+
+        return ret;
     }
 }