You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/10/07 00:53:30 UTC

atlas git commit: ATLAS-2192: notification consumer updates to handle stale split messages

Repository: atlas
Updated Branches:
  refs/heads/master 4fe70de83 -> 9a8c71254


ATLAS-2192: notification consumer updates to handle stale split messages

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/9a8c7125
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/9a8c7125
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/9a8c7125

Branch: refs/heads/master
Commit: 9a8c71254c70a0e70241c8b9c8892603ecb03e7c
Parents: 4fe70de
Author: ashutoshm <am...@hortonworks.com>
Authored: Fri Oct 6 15:44:09 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Oct 6 17:53:16 2017 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/AtlasConfiguration.java    |   2 +
 .../java/org/apache/atlas/hook/AtlasHook.java   |   1 +
 .../notification/AbstractNotification.java      |  39 ++++++-
 .../notification/AtlasNotificationMessage.java  |  39 ++++++-
 .../AtlasNotificationMessageDeserializer.java   | 101 ++++++++++++++++---
 .../AtlasNotificationStringMessage.java         |   6 ++
 .../notification/NotificationInterface.java     |   6 ++
 .../notification/SplitMessageAggregator.java    |  69 +++++++++++++
 .../AbstractNotificationConsumerTest.java       |   2 -
 .../notification/AbstractNotificationTest.java  |  26 +++--
 .../SplitMessageAggregatorTest.java             |  77 ++++++++++++++
 11 files changed, 342 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 451bd9d..bd2bf7f 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -35,6 +35,8 @@ public enum AtlasConfiguration {
 
     NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
     NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
+    NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
+    NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
 
     //search configuration
     SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index a8609e6..4829221 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -85,6 +85,7 @@ public abstract class AtlasHook {
 
         notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
         notificationInterface = NotificationProvider.get();
+        notificationInterface.setCurrentUser(getUser());
 
         LOG.info("Created Atlas Hook");
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 6a70734..4f56bd8 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -31,12 +31,14 @@ import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import java.lang.reflect.Type;
+import java.net.Inet4Address;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -64,6 +66,16 @@ public abstract class AbstractNotification implements NotificationInterface {
 
     public static final int MAX_BYTES_PER_CHAR = 4;  // each char can encode upto 4 bytes in UTF-8
 
+    /**
+     * IP address of the host in which this process has started
+     */
+    private static String localHostAddress = "";
+
+    /**
+     *
+     */
+    private static String currentUser = "";
+
     private final boolean embedded;
     private final boolean isHAEnabled;
 
@@ -107,6 +119,11 @@ public abstract class AbstractNotification implements NotificationInterface {
         send(type, Arrays.asList(messages));
     }
 
+    @Override
+    public void setCurrentUser(String user) {
+        currentUser = user;
+    }
+
     // ----- AbstractNotification --------------------------------------------
 
     /**
@@ -146,6 +163,24 @@ public abstract class AbstractNotification implements NotificationInterface {
         return GSON.toJson(notificationMsg);
     }
 
+    private static String getHostAddress() {
+        if (StringUtils.isEmpty(localHostAddress)) {
+            try {
+                localHostAddress =  Inet4Address.getLocalHost().getHostAddress();
+            } catch (UnknownHostException e) {
+                LOG.warn("failed to get local host address", e);
+
+                localHostAddress =  "";
+            }
+        }
+
+        return localHostAddress;
+    }
+
+    private static String getCurrentUser() {
+        return currentUser;
+    }
+
     /**
      * Get the notification message JSON from the given object.
      *
@@ -154,7 +189,7 @@ public abstract class AbstractNotification implements NotificationInterface {
      * @return the message as a JSON string
      */
     public static void createNotificationMessages(Object message, List<String> msgJsonList) {
-        AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
+        AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
         String                      msgJson         = GSON.toJson(notificationMsg);
 
         boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
index 2f6f9c7..63d93c9 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
@@ -18,10 +18,16 @@
 
 package org.apache.atlas.notification;
 
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
+
 /**
  * Represents a notification message that is associated with a version.
  */
 public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
+    private String msgSourceIP;
+    private String msgCreatedBy;
+    private long   msgCreationTime;
 
     /**
      * The actual message.
@@ -38,11 +44,42 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
      * @param message  the actual message
      */
     public AtlasNotificationMessage(MessageVersion version, T message) {
+        this(version, message, null, null);
+    }
+
+    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
         super(version);
 
-        this.message = message;
+        this.msgSourceIP     = msgSourceIP;
+        this.msgCreatedBy    = createdBy;
+        this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis();
+        this.message         = message;
+    }
+
+
+    public String getMsgSourceIP() {
+        return msgSourceIP;
+    }
+
+    public void setMsgSourceIP(String msgSourceIP) {
+        this.msgSourceIP = msgSourceIP;
     }
 
+    public String getMsgCreatedBy() {
+        return msgCreatedBy;
+    }
+
+    public void setMsgCreatedBy(String msgCreatedBy) {
+        this.msgCreatedBy = msgCreatedBy;
+    }
+
+    public long getMsgCreationTime() {
+        return msgCreationTime;
+    }
+
+    public void setMsgCreationTime(long msgCreationTime) {
+        this.msgCreationTime = msgCreationTime;
+    }
 
     public T getMessage() {
         return message;

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 3d80284..2a175ba 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.notification;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
 import org.apache.commons.lang3.StringUtils;
@@ -26,8 +27,14 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS;
+import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS;
 
 /**
  * Deserializer that works with notification messages.  The version of each deserialized message is checked against an
@@ -47,8 +54,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     private final Gson gson;
 
 
-    private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>();
-
+    private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>();
+    private final long                                splitMessageBufferPurgeIntervalMs;
+    private final long                                splitMessageSegmentsWaitTimeMs;
+    private long                                      splitMessagesLastPurgeTime    = System.currentTimeMillis();
+    private final AtomicLong                          messageCountTotal             = new AtomicLong(0);
+    private final AtomicLong                          messageCountSinceLastInterval = new AtomicLong(0);
     // ----- Constructors ----------------------------------------------------
 
     /**
@@ -61,11 +72,22 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
      */
     public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
                                                 Gson gson, Logger notificationLogger) {
-        this.notificationMessageType = notificationMessageType;
-        this.messageType             = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
-        this.expectedVersion         = expectedVersion;
-        this.gson                    = gson;
-        this.notificationLogger      = notificationLogger;
+        this(notificationMessageType, expectedVersion, gson, notificationLogger,
+             NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000,
+             NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000);
+    }
+
+    public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
+                                                Gson gson, Logger notificationLogger,
+                                                long splitMessageSegmentsWaitTimeMs,
+                                                long splitMessageBufferPurgeIntervalMs) {
+        this.notificationMessageType           = notificationMessageType;
+        this.messageType                       = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
+        this.expectedVersion                   = expectedVersion;
+        this.gson                              = gson;
+        this.notificationLogger                = notificationLogger;
+        this.splitMessageSegmentsWaitTimeMs    = splitMessageSegmentsWaitTimeMs;
+        this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs;
     }
 
     // ----- MessageDeserializer ---------------------------------------------
@@ -74,6 +96,9 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     public T deserialize(String messageJson) {
         final T ret;
 
+        messageCountTotal.incrementAndGet();
+        messageCountSinceLastInterval.incrementAndGet();
+
         AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
 
         if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
@@ -96,12 +121,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
                     final int splitIdx   = splitMsg.getMsgSplitIdx();
                     final int splitCount = splitMsg.getMsgSplitCount();
 
-                    final AtlasNotificationStringMessage[] splitMsgs;
+                    final SplitMessageAggregator splitMsgs;
 
                     if (splitIdx == 0) {
-                        splitMsgs = new AtlasNotificationStringMessage[splitCount];
+                        splitMsgs = new SplitMessageAggregator(splitMsg);
 
-                        splitMsgBuffer.put(msgId, splitMsgs);
+                        splitMsgBuffer.put(splitMsgs.getMsgId(), splitMsgs);
                     } else {
                         splitMsgs = splitMsgBuffer.get(msgId);
                     }
@@ -110,24 +135,24 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
                         LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
 
                         msg = null;
-                    } else if (splitMsgs.length <= splitIdx) {
+                    } else if (splitMsgs.getTotalSplitCount() <= splitIdx) {
                         LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
 
                         msg = null;
                     } else {
                         LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount);
 
-                        splitMsgs[splitIdx] = splitMsg;
+                        boolean isReady = splitMsgs.add(splitMsg);
 
-                        if (splitIdx == (splitCount - 1)) { // last message
+                        if (isReady) { // last message
                             splitMsgBuffer.remove(msgId);
 
                             boolean isValidMessage = true;
 
                             StringBuilder sb = new StringBuilder();
 
-                            for (int i = 0; i < splitMsgs.length; i++) {
-                                splitMsg = splitMsgs[i];
+                            for (int i = 0; i < splitMsgs.getTotalSplitCount(); i++) {
+                                splitMsg = splitMsgs.get(i);
 
                                 if (splitMsg == null) {
                                     LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
@@ -192,9 +217,55 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
             }
         }
 
+
+        long now                = System.currentTimeMillis();
+        long timeSinceLastPurge = now - splitMessagesLastPurgeTime;
+
+        if(timeSinceLastPurge >= splitMessageBufferPurgeIntervalMs) {
+            purgeStaleMessages(splitMsgBuffer, now, splitMessageSegmentsWaitTimeMs);
+
+            LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", messageCountTotal.get(), messageCountSinceLastInterval.getAndSet(0));
+
+            splitMessagesLastPurgeTime = now;
+        }
+
         return ret;
     }
 
+    @VisibleForTesting
+    static void purgeStaleMessages(Map<String, SplitMessageAggregator> splitMsgBuffer, long now, long maxWaitTime) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
+        }
+
+        List<SplitMessageAggregator> evictionList = null;
+
+        for (SplitMessageAggregator aggregrator : splitMsgBuffer.values()) {
+            long waitTime = now - aggregrator.getFirstSplitTimestamp();
+
+            if (waitTime < maxWaitTime) {
+                continue;
+            }
+
+            if(evictionList == null) {
+                evictionList = new ArrayList<>();
+            }
+
+             evictionList.add(aggregrator);
+        }
+
+        if(evictionList != null) {
+            for (SplitMessageAggregator aggregrator : evictionList) {
+                LOG.error("evicting notification msgID={}, totalSplitCount={}, receivedSplitCount={}", aggregrator.getMsgId(), aggregrator.getTotalSplitCount(), aggregrator.getReceivedSplitCount());
+                splitMsgBuffer.remove(aggregrator.getMsgId());
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
+        }
+    }
+
     // ----- helper methods --------------------------------------------------
 
     /**

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
index 193735c..41485a0 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
@@ -38,6 +38,12 @@ public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage
         this.message = message;
     }
 
+    public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
+        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
+
+        this.message = message;
+    }
+
     public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
         super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index a787862..8809225 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -99,6 +99,12 @@ public interface NotificationInterface {
     }
 
     /**
+     *
+     * @param user Name of the user under which the processes is running
+     */
+    void setCurrentUser(String user);
+
+    /**
      * Create notification consumers for the given notification type.
      *
      * @param notificationType  the notification type (i.e. HOOK, ENTITIES)

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
new file mode 100644
index 0000000..148b57f
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+
+public class SplitMessageAggregator {
+    private final String                           msgId;
+    private final AtlasNotificationStringMessage[] splitMessagesBuffer;
+    private final long                             firstSplitTimestamp;
+
+    public SplitMessageAggregator(AtlasNotificationStringMessage message) {
+        msgId               = message.getMsgId();
+        splitMessagesBuffer = new AtlasNotificationStringMessage[message.getMsgSplitCount()];
+        firstSplitTimestamp = System.currentTimeMillis();
+
+        add(message);
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public long getTotalSplitCount() {
+        return splitMessagesBuffer.length;
+    }
+
+    public long getReceivedSplitCount() {
+        long ret = 0;
+
+        for (AtlasNotificationStringMessage split : splitMessagesBuffer) {
+            if (split != null) {
+                ret++;
+            }
+        }
+
+        return ret;
+    }
+
+    public long getFirstSplitTimestamp() {
+        return firstSplitTimestamp;
+    }
+
+    public boolean add(AtlasNotificationStringMessage message) {
+        if (message.getMsgSplitIdx() < splitMessagesBuffer.length) {
+            splitMessagesBuffer[message.getMsgSplitIdx()] = message;
+        }
+
+        return message.getMsgSplitIdx() == (message.getMsgSplitCount() - 1);
+    }
+
+    public AtlasNotificationStringMessage get(int i) {
+        return splitMessagesBuffer[i];
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 12f48d1..f313ddc 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -31,10 +31,8 @@ import java.util.List;
 import java.util.Objects;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import org.apache.kafka.common.TopicPartition;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 4719324..655252c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.*;
@@ -54,9 +55,9 @@ public class AbstractNotificationTest {
 
         assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
         assertEquals(3, notification.messages.size());
-        assertEquals(messageJson.get(0), notification.messages.get(0));
-        assertEquals(messageJson.get(1), notification.messages.get(1));
-        assertEquals(messageJson.get(2), notification.messages.get(2));
+        for (int i = 0; i < notification.messages.size(); i++) {
+            assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
+        }
     }
 
     @Test
@@ -81,9 +82,11 @@ public class AbstractNotificationTest {
 
         notification.send(NotificationInterface.NotificationType.HOOK, messages);
 
-        assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
-        assertEquals(messageJson.size(), notification.messages.size());
-        assertEquals(messageJson, notification.messages);
+        assertEquals(notification.type, NotificationInterface.NotificationType.HOOK);
+        assertEquals(notification.messages.size(), messageJson.size());
+        for (int i = 0; i < notification.messages.size(); i++) {
+            assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
+        }
     }
 
     public static class TestMessage extends HookNotification.HookNotificationMessage {
@@ -93,6 +96,17 @@ public class AbstractNotificationTest {
         }
     }
 
+    // ignore msgCreationTime in Json
+    private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) {
+        Map<Object, Object> msgActual   = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class);
+        Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class);
+
+        msgActual.remove("msgCreationTime");
+        msgExpected.remove("msgCreationTime");
+
+        assertEquals(msgActual, msgExpected);
+    }
+
     public static class TestNotification extends AbstractNotification {
         private NotificationType type;
         private List<String>     messages;

http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
new file mode 100644
index 0000000..0807221
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SplitMessageAggregatorTest {
+    @Test
+    public void verifyEviction() throws InterruptedException {
+        Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap();
+
+        Thread.currentThread().sleep(500);
+
+        AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), 250);
+
+        Assert.assertEquals(map.size(), 0);
+    }
+
+
+    @Test
+    public void verifyEvictionDoesNotOccur() throws InterruptedException {
+        Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap();
+
+        int expectedSize = map.size();
+
+        Thread.currentThread().sleep(500);
+
+        AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), Long.MAX_VALUE);
+
+        Assert.assertEquals(map.size(), expectedSize);
+    }
+
+    private Map<String, SplitMessageAggregator> getStringSplitMessageAggregatorMap() {
+        Map<String, SplitMessageAggregator> map = new HashMap<>();
+
+        map.put("1", getSplitMessageAggregator("1", 5));
+        map.put("2", getSplitMessageAggregator("2", 10));
+
+        return map;
+    }
+
+    private SplitMessageAggregator getSplitMessageAggregator(String id, int splitCount) {
+        SplitMessageAggregator sma = null;
+
+        for (int i = 0; i < splitCount; i++) {
+            AtlasNotificationStringMessage sm = new AtlasNotificationStringMessage("aaaaa", id, CompressionKind.NONE, i, splitCount);
+
+            if(sma == null) {
+                sma = new SplitMessageAggregator(sm);
+            } else {
+                sma.add(sm);
+            }
+        }
+
+        return sma;
+    }
+}