You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/10/07 00:53:30 UTC
atlas git commit: ATLAS-2192: notification consumer updates to handle
stale split messages
Repository: atlas
Updated Branches:
refs/heads/master 4fe70de83 -> 9a8c71254
ATLAS-2192: notification consumer updates to handle stale split messages
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/9a8c7125
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/9a8c7125
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/9a8c7125
Branch: refs/heads/master
Commit: 9a8c71254c70a0e70241c8b9c8892603ecb03e7c
Parents: 4fe70de
Author: ashutoshm <am...@hortonworks.com>
Authored: Fri Oct 6 15:44:09 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Oct 6 17:53:16 2017 -0700
----------------------------------------------------------------------
.../org/apache/atlas/AtlasConfiguration.java | 2 +
.../java/org/apache/atlas/hook/AtlasHook.java | 1 +
.../notification/AbstractNotification.java | 39 ++++++-
.../notification/AtlasNotificationMessage.java | 39 ++++++-
.../AtlasNotificationMessageDeserializer.java | 101 ++++++++++++++++---
.../AtlasNotificationStringMessage.java | 6 ++
.../notification/NotificationInterface.java | 6 ++
.../notification/SplitMessageAggregator.java | 69 +++++++++++++
.../AbstractNotificationConsumerTest.java | 2 -
.../notification/AbstractNotificationTest.java | 26 +++--
.../SplitMessageAggregatorTest.java | 77 ++++++++++++++
11 files changed, 342 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 451bd9d..bd2bf7f 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -35,6 +35,8 @@ public enum AtlasConfiguration {
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
+ NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
+ NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
//search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index a8609e6..4829221 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -85,6 +85,7 @@ public abstract class AtlasHook {
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
notificationInterface = NotificationProvider.get();
+ notificationInterface.setCurrentUser(getUser());
LOG.info("Created Atlas Hook");
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 6a70734..4f56bd8 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -31,12 +31,14 @@ import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.lang.reflect.Type;
+import java.net.Inet4Address;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -64,6 +66,16 @@ public abstract class AbstractNotification implements NotificationInterface {
public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8
+ /**
+ * IP address of the host in which this process has started
+ */
+ private static String localHostAddress = "";
+
+ /**
+ *
+ */
+ private static String currentUser = "";
+
private final boolean embedded;
private final boolean isHAEnabled;
@@ -107,6 +119,11 @@ public abstract class AbstractNotification implements NotificationInterface {
send(type, Arrays.asList(messages));
}
+ @Override
+ public void setCurrentUser(String user) {
+ currentUser = user;
+ }
+
// ----- AbstractNotification --------------------------------------------
/**
@@ -146,6 +163,24 @@ public abstract class AbstractNotification implements NotificationInterface {
return GSON.toJson(notificationMsg);
}
+ private static String getHostAddress() {
+ if (StringUtils.isEmpty(localHostAddress)) {
+ try {
+ localHostAddress = Inet4Address.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ LOG.warn("failed to get local host address", e);
+
+ localHostAddress = "";
+ }
+ }
+
+ return localHostAddress;
+ }
+
+ private static String getCurrentUser() {
+ return currentUser;
+ }
+
/**
* Get the notification message JSON from the given object.
*
@@ -154,7 +189,7 @@ public abstract class AbstractNotification implements NotificationInterface {
* @return the message as a JSON string
*/
public static void createNotificationMessages(Object message, List<String> msgJsonList) {
- AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
+ AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
String msgJson = GSON.toJson(notificationMsg);
boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
index 2f6f9c7..63d93c9 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
@@ -18,10 +18,16 @@
package org.apache.atlas.notification;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
+
/**
* Represents a notification message that is associated with a version.
*/
public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
+ private String msgSourceIP;
+ private String msgCreatedBy;
+ private long msgCreationTime;
/**
* The actual message.
@@ -38,11 +44,42 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
* @param message the actual message
*/
public AtlasNotificationMessage(MessageVersion version, T message) {
+ this(version, message, null, null);
+ }
+
+ public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
super(version);
- this.message = message;
+ this.msgSourceIP = msgSourceIP;
+ this.msgCreatedBy = createdBy;
+ this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis();
+ this.message = message;
+ }
+
+
+ public String getMsgSourceIP() {
+ return msgSourceIP;
+ }
+
+ public void setMsgSourceIP(String msgSourceIP) {
+ this.msgSourceIP = msgSourceIP;
}
+ public String getMsgCreatedBy() {
+ return msgCreatedBy;
+ }
+
+ public void setMsgCreatedBy(String msgCreatedBy) {
+ this.msgCreatedBy = msgCreatedBy;
+ }
+
+ public long getMsgCreationTime() {
+ return msgCreationTime;
+ }
+
+ public void setMsgCreationTime(long msgCreationTime) {
+ this.msgCreationTime = msgCreationTime;
+ }
public T getMessage() {
return message;
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 3d80284..2a175ba 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.atlas.notification;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.commons.lang3.StringUtils;
@@ -26,8 +27,14 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS;
+import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS;
/**
* Deserializer that works with notification messages. The version of each deserialized message is checked against an
@@ -47,8 +54,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
private final Gson gson;
- private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>();
-
+ private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>();
+ private final long splitMessageBufferPurgeIntervalMs;
+ private final long splitMessageSegmentsWaitTimeMs;
+ private long splitMessagesLastPurgeTime = System.currentTimeMillis();
+ private final AtomicLong messageCountTotal = new AtomicLong(0);
+ private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0);
// ----- Constructors ----------------------------------------------------
/**
@@ -61,11 +72,22 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
*/
public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
Gson gson, Logger notificationLogger) {
- this.notificationMessageType = notificationMessageType;
- this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
- this.expectedVersion = expectedVersion;
- this.gson = gson;
- this.notificationLogger = notificationLogger;
+ this(notificationMessageType, expectedVersion, gson, notificationLogger,
+ NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000,
+ NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000);
+ }
+
+ public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
+ Gson gson, Logger notificationLogger,
+ long splitMessageSegmentsWaitTimeMs,
+ long splitMessageBufferPurgeIntervalMs) {
+ this.notificationMessageType = notificationMessageType;
+ this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
+ this.expectedVersion = expectedVersion;
+ this.gson = gson;
+ this.notificationLogger = notificationLogger;
+ this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs;
+ this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs;
}
// ----- MessageDeserializer ---------------------------------------------
@@ -74,6 +96,9 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
public T deserialize(String messageJson) {
final T ret;
+ messageCountTotal.incrementAndGet();
+ messageCountSinceLastInterval.incrementAndGet();
+
AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
@@ -96,12 +121,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
final int splitIdx = splitMsg.getMsgSplitIdx();
final int splitCount = splitMsg.getMsgSplitCount();
- final AtlasNotificationStringMessage[] splitMsgs;
+ final SplitMessageAggregator splitMsgs;
if (splitIdx == 0) {
- splitMsgs = new AtlasNotificationStringMessage[splitCount];
+ splitMsgs = new SplitMessageAggregator(splitMsg);
- splitMsgBuffer.put(msgId, splitMsgs);
+ splitMsgBuffer.put(splitMsgs.getMsgId(), splitMsgs);
} else {
splitMsgs = splitMsgBuffer.get(msgId);
}
@@ -110,24 +135,24 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
- } else if (splitMsgs.length <= splitIdx) {
+ } else if (splitMsgs.getTotalSplitCount() <= splitIdx) {
LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
} else {
LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount);
- splitMsgs[splitIdx] = splitMsg;
+ boolean isReady = splitMsgs.add(splitMsg);
- if (splitIdx == (splitCount - 1)) { // last message
+ if (isReady) { // last message
splitMsgBuffer.remove(msgId);
boolean isValidMessage = true;
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < splitMsgs.length; i++) {
- splitMsg = splitMsgs[i];
+ for (int i = 0; i < splitMsgs.getTotalSplitCount(); i++) {
+ splitMsg = splitMsgs.get(i);
if (splitMsg == null) {
LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
@@ -192,9 +217,55 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
}
}
+
+ long now = System.currentTimeMillis();
+ long timeSinceLastPurge = now - splitMessagesLastPurgeTime;
+
+ if(timeSinceLastPurge >= splitMessageBufferPurgeIntervalMs) {
+ purgeStaleMessages(splitMsgBuffer, now, splitMessageSegmentsWaitTimeMs);
+
+ LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", messageCountTotal.get(), messageCountSinceLastInterval.getAndSet(0));
+
+ splitMessagesLastPurgeTime = now;
+ }
+
return ret;
}
+ @VisibleForTesting
+ static void purgeStaleMessages(Map<String, SplitMessageAggregator> splitMsgBuffer, long now, long maxWaitTime) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
+ }
+
+ List<SplitMessageAggregator> evictionList = null;
+
+ for (SplitMessageAggregator aggregrator : splitMsgBuffer.values()) {
+ long waitTime = now - aggregrator.getFirstSplitTimestamp();
+
+ if (waitTime < maxWaitTime) {
+ continue;
+ }
+
+ if(evictionList == null) {
+ evictionList = new ArrayList<>();
+ }
+
+ evictionList.add(aggregrator);
+ }
+
+ if(evictionList != null) {
+ for (SplitMessageAggregator aggregrator : evictionList) {
+ LOG.error("evicting notification msgID={}, totalSplitCount={}, receivedSplitCount={}", aggregrator.getMsgId(), aggregrator.getTotalSplitCount(), aggregrator.getReceivedSplitCount());
+ splitMsgBuffer.remove(aggregrator.getMsgId());
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
+ }
+ }
+
// ----- helper methods --------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
index 193735c..41485a0 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
@@ -38,6 +38,12 @@ public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage
this.message = message;
}
+ public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
+ super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
+
+ this.message = message;
+ }
+
public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index a787862..8809225 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -99,6 +99,12 @@ public interface NotificationInterface {
}
/**
+ *
+ * @param user Name of the user under which the processes is running
+ */
+ void setCurrentUser(String user);
+
+ /**
* Create notification consumers for the given notification type.
*
* @param notificationType the notification type (i.e. HOOK, ENTITIES)
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
new file mode 100644
index 0000000..148b57f
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+
+public class SplitMessageAggregator {
+ private final String msgId;
+ private final AtlasNotificationStringMessage[] splitMessagesBuffer;
+ private final long firstSplitTimestamp;
+
+ public SplitMessageAggregator(AtlasNotificationStringMessage message) {
+ msgId = message.getMsgId();
+ splitMessagesBuffer = new AtlasNotificationStringMessage[message.getMsgSplitCount()];
+ firstSplitTimestamp = System.currentTimeMillis();
+
+ add(message);
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public long getTotalSplitCount() {
+ return splitMessagesBuffer.length;
+ }
+
+ public long getReceivedSplitCount() {
+ long ret = 0;
+
+ for (AtlasNotificationStringMessage split : splitMessagesBuffer) {
+ if (split != null) {
+ ret++;
+ }
+ }
+
+ return ret;
+ }
+
+ public long getFirstSplitTimestamp() {
+ return firstSplitTimestamp;
+ }
+
+ public boolean add(AtlasNotificationStringMessage message) {
+ if (message.getMsgSplitIdx() < splitMessagesBuffer.length) {
+ splitMessagesBuffer[message.getMsgSplitIdx()] = message;
+ }
+
+ return message.getMsgSplitIdx() == (message.getMsgSplitCount() - 1);
+ }
+
+ public AtlasNotificationStringMessage get(int i) {
+ return splitMessagesBuffer[i];
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 12f48d1..f313ddc 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -31,10 +31,8 @@ import java.util.List;
import java.util.Objects;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.kafka.common.TopicPartition;
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 4719324..655252c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.*;
@@ -54,9 +55,9 @@ public class AbstractNotificationTest {
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
assertEquals(3, notification.messages.size());
- assertEquals(messageJson.get(0), notification.messages.get(0));
- assertEquals(messageJson.get(1), notification.messages.get(1));
- assertEquals(messageJson.get(2), notification.messages.get(2));
+ for (int i = 0; i < notification.messages.size(); i++) {
+ assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
+ }
}
@Test
@@ -81,9 +82,11 @@ public class AbstractNotificationTest {
notification.send(NotificationInterface.NotificationType.HOOK, messages);
- assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
- assertEquals(messageJson.size(), notification.messages.size());
- assertEquals(messageJson, notification.messages);
+ assertEquals(notification.type, NotificationInterface.NotificationType.HOOK);
+ assertEquals(notification.messages.size(), messageJson.size());
+ for (int i = 0; i < notification.messages.size(); i++) {
+ assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
+ }
}
public static class TestMessage extends HookNotification.HookNotificationMessage {
@@ -93,6 +96,17 @@ public class AbstractNotificationTest {
}
}
+ // ignore msgCreationTime in Json
+ private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) {
+ Map<Object, Object> msgActual = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class);
+ Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class);
+
+ msgActual.remove("msgCreationTime");
+ msgExpected.remove("msgCreationTime");
+
+ assertEquals(msgActual, msgExpected);
+ }
+
public static class TestNotification extends AbstractNotification {
private NotificationType type;
private List<String> messages;
http://git-wip-us.apache.org/repos/asf/atlas/blob/9a8c7125/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
new file mode 100644
index 0000000..0807221
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SplitMessageAggregatorTest {
+ @Test
+ public void verifyEviction() throws InterruptedException {
+ Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap();
+
+ Thread.currentThread().sleep(500);
+
+ AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), 250);
+
+ Assert.assertEquals(map.size(), 0);
+ }
+
+
+ @Test
+ public void verifyEvictionDoesNotOccur() throws InterruptedException {
+ Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap();
+
+ int expectedSize = map.size();
+
+ Thread.currentThread().sleep(500);
+
+ AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), Long.MAX_VALUE);
+
+ Assert.assertEquals(map.size(), expectedSize);
+ }
+
+ private Map<String, SplitMessageAggregator> getStringSplitMessageAggregatorMap() {
+ Map<String, SplitMessageAggregator> map = new HashMap<>();
+
+ map.put("1", getSplitMessageAggregator("1", 5));
+ map.put("2", getSplitMessageAggregator("2", 10));
+
+ return map;
+ }
+
+ private SplitMessageAggregator getSplitMessageAggregator(String id, int splitCount) {
+ SplitMessageAggregator sma = null;
+
+ for (int i = 0; i < splitCount; i++) {
+ AtlasNotificationStringMessage sm = new AtlasNotificationStringMessage("aaaaa", id, CompressionKind.NONE, i, splitCount);
+
+ if(sma == null) {
+ sma = new SplitMessageAggregator(sm);
+ } else {
+ sma.add(sm);
+ }
+ }
+
+ return sma;
+ }
+}