You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/09/30 11:57:18 UTC
atlas git commit: ATLAS-2075: notification enhancement to handle
large messages, using compression and multi-part messages
Repository: atlas
Updated Branches:
refs/heads/branch-0.8 c61d489cc -> 99243ee8e
ATLAS-2075: notification enhancement to handle large messages, using compression and multi-part messages
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/99243ee8
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/99243ee8
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/99243ee8
Branch: refs/heads/branch-0.8
Commit: 99243ee8e18656acd72601468c99e7781a0b04f7
Parents: c61d489
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Wed Sep 27 20:42:25 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sat Sep 30 04:56:35 2017 -0700
----------------------------------------------------------------------
.../org/apache/atlas/AtlasConfiguration.java | 7 +
.../apache/atlas/kafka/AtlasKafkaConsumer.java | 4 +
.../apache/atlas/kafka/KafkaNotification.java | 4 +-
.../AbstractMessageDeserializer.java | 14 +-
.../notification/AbstractNotification.java | 113 +++++++++-
.../AtlasNotificationBaseMessage.java | 194 ++++++++++++++++
.../notification/AtlasNotificationMessage.java | 50 +++++
.../AtlasNotificationMessageDeserializer.java | 225 +++++++++++++++++++
.../AtlasNotificationStringMessage.java | 60 +++++
.../atlas/notification/MessageVersion.java | 3 +
.../notification/NotificationInterface.java | 4 +-
.../atlas/notification/VersionedMessage.java | 75 -------
.../VersionedMessageDeserializer.java | 105 ---------
.../apache/atlas/kafka/KafkaConsumerTest.java | 11 +-
.../atlas/kafka/KafkaNotificationMockTest.java | 18 +-
.../AbstractNotificationConsumerTest.java | 41 ++--
.../notification/AbstractNotificationTest.java | 33 +--
.../AtlasNotificationMessageTest.java | 57 +++++
.../notification/VersionedMessageTest.java | 57 -----
.../entity/EntityMessageDeserializerTest.java | 16 +-
.../hook/HookMessageDeserializerTest.java | 134 +++++++++--
21 files changed, 895 insertions(+), 330 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 9a9bb76..451bd9d 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -33,6 +33,9 @@ public enum AtlasConfiguration {
QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
+ NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
+ NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
+
//search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
@@ -63,6 +66,10 @@ public enum AtlasConfiguration {
return APPLICATION_PROPERTIES.getLong(propertyName, Long.valueOf(defaultValue.toString()).longValue());
}
+ public boolean getBoolean() {
+ return APPLICATION_PROPERTIES.getBoolean(propertyName, Boolean.valueOf(defaultValue.toString()).booleanValue());
+ }
+
public String getString() {
return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString());
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index d3b4e49..e3bb71c 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -71,6 +71,10 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
T message = deserializer.deserialize(record.value().toString());
+ if (message == null) {
+ continue;
+ }
+
messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition()));
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 38889ef..6bb8d73 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -202,7 +202,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
// ----- AbstractNotification --------------------------------------------
@Override
- public void sendInternal(NotificationType type, String... messages) throws NotificationException {
+ public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
if (producer == null) {
createProducer();
}
@@ -210,7 +210,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
@VisibleForTesting
- void sendInternalToProducer(Producer p, NotificationType type, String[] messages) throws NotificationException {
+ void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException {
String topic = TOPIC_MAP.get(type);
List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
index ec99372..37a57d1 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
@@ -44,7 +44,7 @@ import java.util.Map;
/**
* Base notification message deserializer.
*/
-public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> {
+public abstract class AbstractMessageDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>();
@@ -63,16 +63,16 @@ public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDes
/**
* Create a deserializer.
*
- * @param versionedMessageType the type of the versioned message
- * @param expectedVersion the expected message version
- * @param deserializerMap map of individual deserializers used to define this message deserializer
- * @param notificationLogger logger for message version mismatch
+ * @param notificationMessageType the type of the notification message
+ * @param expectedVersion the expected message version
+ * @param deserializerMap map of individual deserializers used to define this message deserializer
+ * @param notificationLogger logger for message version mismatch
*/
- public AbstractMessageDeserializer(Type versionedMessageType,
+ public AbstractMessageDeserializer(Type notificationMessageType,
MessageVersion expectedVersion,
Map<Type, JsonDeserializer> deserializerMap,
Logger notificationLogger) {
- super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
+ super(notificationMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index cb44fc6..1f9404d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -26,21 +26,34 @@ import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.Type;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
+import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
/**
* Abstract notification interface implementation.
*/
public abstract class AbstractNotification implements NotificationInterface {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNotification.class);
+
+ private static String msgIdPrefix = UUID.randomUUID().toString();
+ private static AtomicInteger msgIdSuffix = new AtomicInteger(0);
/**
* The current expected version for notification messages.
@@ -48,6 +61,9 @@ public abstract class AbstractNotification implements NotificationInterface {
public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
+
+ public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8
+
private final boolean embedded;
private final boolean isHAEnabled;
@@ -77,10 +93,12 @@ public abstract class AbstractNotification implements NotificationInterface {
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
- String[] strMessages = new String[messages.size()];
+ List<String> strMessages = new ArrayList<>(messages.size());
+
for (int index = 0; index < messages.size(); index++) {
- strMessages[index] = getMessageJson(messages.get(index));
+ createNotificationMessages(messages.get(index), strMessages);
}
+
sendInternal(type, strMessages);
}
@@ -117,11 +135,17 @@ public abstract class AbstractNotification implements NotificationInterface {
*
* @throws NotificationException if an error occurs while sending
*/
- protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
+ protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
// ----- utility methods -------------------------------------------------
+ public static String getMessageJson(Object message) {
+ AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
+
+ return GSON.toJson(notificationMsg);
+ }
+
/**
* Get the notification message JSON from the given object.
*
@@ -129,10 +153,75 @@ public abstract class AbstractNotification implements NotificationInterface {
*
* @return the message as a JSON string
*/
- public static String getMessageJson(Object message) {
- VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message);
+ public static void createNotificationMessages(Object message, List<String> msgJsonList) {
+ AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
+ String msgJson = GSON.toJson(notificationMsg);
+
+ boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
+
+ if (msgLengthExceedsLimit) { // get utf-8 bytes for msgJson and check for length limit again
+ byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
+
+ msgLengthExceedsLimit = msgBytes.length > MESSAGE_MAX_LENGTH_BYTES;
+
+ if (msgLengthExceedsLimit) {
+ String msgId = getNextMessageId();
+ CompressionKind compressionKind = CompressionKind.NONE;
+
+ if (MESSAGE_COMPRESSION_ENABLED) {
+ byte[] encodedBytes = AtlasNotificationBaseMessage.gzipCompressAndEncodeBase64(msgBytes);
+
+ compressionKind = CompressionKind.GZIP;
+
+ LOG.info("Compressed large message: msgID={}, uncompressed={} bytes, compressed={} bytes", msgId, msgBytes.length, encodedBytes.length);
+
+ msgLengthExceedsLimit = encodedBytes.length > MESSAGE_MAX_LENGTH_BYTES;
- return GSON.toJson(versionedMessage);
+ if (!msgLengthExceedsLimit) { // no need to split
+ AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind);
+
+ msgJson = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
+ msgBytes = null; // not used after this point
+ } else { // encodedBytes will be split
+ msgJson = null; // not used after this point
+ msgBytes = encodedBytes;
+ }
+ }
+
+ if (msgLengthExceedsLimit) {
+ // compressed messages are already base64-encoded
+ byte[] encodedBytes = compressionKind != CompressionKind.NONE ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes);
+
+ int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES;
+
+ if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) {
+ splitCount++;
+ }
+
+ LOG.info("Splitting large message: msgID={}, length={} bytes, splitCount={}", msgId, encodedBytes.length, splitCount);
+
+ for (int i = 0, offset = 0; i < splitCount; i++) {
+ int length = MESSAGE_MAX_LENGTH_BYTES;
+
+ if ((offset + length) > encodedBytes.length) {
+ length = encodedBytes.length - offset;
+ }
+
+ AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount);
+
+ String splitMsgJson = GSON.toJson(splitMsg);
+
+ msgJsonList.add(splitMsgJson);
+
+ offset += length;
+ }
+ }
+ }
+ }
+
+ if (!msgLengthExceedsLimit) {
+ msgJsonList.add(msgJson);
+ }
}
@@ -158,4 +247,16 @@ public abstract class AbstractNotification implements NotificationInterface {
return new JsonParser().parse(src.toString()).getAsJsonArray();
}
}
+
+ private static String getNextMessageId() {
+ String nextMsgIdPrefix = msgIdPrefix;
+ int nextMsgIdSuffix = msgIdSuffix.getAndIncrement();
+
+ if (nextMsgIdSuffix == Short.MAX_VALUE) { // get a new UUID after 32,767 IDs
+ msgIdPrefix = UUID.randomUUID().toString();
+ msgIdSuffix = new AtomicInteger(0);
+ }
+
+ return nextMsgIdPrefix + "_" + Integer.toString(nextMsgIdSuffix);
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
new file mode 100644
index 0000000..3b377de
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.commons.compress.utils.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+
+public class AtlasNotificationBaseMessage {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class);
+
+ public static final int MESSAGE_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop;
+ public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean();
+
+ public enum CompressionKind { NONE, GZIP };
+
+ private MessageVersion version = null;
+ private String msgId = null;
+ private CompressionKind msgCompressionKind = CompressionKind.NONE;
+ private int msgSplitIdx = 1;
+ private int msgSplitCount = 1;
+
+
+ public AtlasNotificationBaseMessage() {
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version) {
+ this(version, null, CompressionKind.NONE);
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
+ this.version = version;
+ this.msgId = msgId;
+ this.msgCompressionKind = msgCompressionKind;
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
+ this.version = version;
+ this.msgId = msgId;
+ this.msgCompressionKind = msgCompressionKind;
+ this.msgSplitIdx = msgSplitIdx;
+ this.msgSplitCount = msgSplitCount;
+ }
+
+ public void setVersion(MessageVersion version) {
+ this.version = version;
+ }
+
+ public MessageVersion getVersion() {
+ return version;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+ public CompressionKind getMsgCompressionKind() {
+ return msgCompressionKind;
+ }
+
+ public void setMsgCompressed(CompressionKind msgCompressionKind) {
+ this.msgCompressionKind = msgCompressionKind;
+ }
+
+ public int getMsgSplitIdx() {
+ return msgSplitIdx;
+ }
+
+ public void setMsgSplitIdx(int msgSplitIdx) {
+ this.msgSplitIdx = msgSplitIdx;
+ }
+
+ public int getMsgSplitCount() {
+ return msgSplitCount;
+ }
+
+ public void setMsgSplitCount(int msgSplitCount) {
+ this.msgSplitCount = msgSplitCount;
+ }
+
+ /**
+ * Compare the version of this message with the given version.
+ *
+ * @param compareToVersion the version to compare to
+ *
+ * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
+ * or greater than the given version.
+ */
+ public int compareVersion(MessageVersion compareToVersion) {
+ return version.compareTo(compareToVersion);
+ }
+
+
+ public static byte[] getBytesUtf8(String str) {
+ return StringUtils.getBytesUtf8(str);
+ }
+
+ public static String getStringUtf8(byte[] bytes) {
+ return StringUtils.newStringUtf8(bytes);
+ }
+
+ public static byte[] encodeBase64(byte[] bytes) {
+ return Base64.encodeBase64(bytes);
+ }
+
+ public static byte[] decodeBase64(byte[] bytes) {
+ return Base64.decodeBase64(bytes);
+ }
+
+ public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) {
+ return encodeBase64(gzipCompress(bytes));
+ }
+
+ public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) {
+ return gzipUncompress(decodeBase64(bytes));
+ }
+
+ public static String gzipCompress(String str) {
+ byte[] bytes = getBytesUtf8(str);
+ byte[] compressedBytes = gzipCompress(bytes);
+ byte[] encodedBytes = encodeBase64(compressedBytes);
+
+ return getStringUtf8(encodedBytes);
+ }
+
+ public static String gzipUncompress(String str) {
+ byte[] encodedBytes = getBytesUtf8(str);
+ byte[] compressedBytes = decodeBase64(encodedBytes);
+ byte[] bytes = gzipUncompress(compressedBytes);
+
+ return getStringUtf8(bytes);
+ }
+
+ public static byte[] gzipCompress(byte[] content) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+
+ try {
+ GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
+
+ gzipOutputStream.write(content);
+ gzipOutputStream.close();
+ } catch (IOException e) {
+ LOG.error("gzipCompress(): error compressing {} bytes", content.length, e);
+
+ throw new RuntimeException(e);
+ }
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ public static byte[] gzipUncompress(byte[] content) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ try {
+ IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out);
+ } catch (IOException e) {
+ LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e);
+ }
+
+ return out.toByteArray();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
new file mode 100644
index 0000000..2f6f9c7
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Represents a notification message that is associated with a version.
+ */
+public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
+
+ /**
+ * The actual message.
+ */
+ private final T message;
+
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create a notification message.
+ *
+ * @param version the message version
+ * @param message the actual message
+ */
+ public AtlasNotificationMessage(MessageVersion version, T message) {
+ super(version);
+
+ this.message = message;
+ }
+
+
+ public T getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
new file mode 100644
index 0000000..b1ac2fa
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.gson.Gson;
+import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Deserializer that works with notification messages. The version of each deserialized message is checked against an
+ * expected version.
+ */
+public abstract class AtlasNotificationMessageDeserializer<T> implements MessageDeserializer<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationMessageDeserializer.class);
+
+
+ public static final String VERSION_MISMATCH_MSG =
+ "Notification message version mismatch. Expected %s but recieved %s. Message %s";
+
+ private final Type notificationMessageType;
+ private final Type messageType;
+ private final MessageVersion expectedVersion;
+ private final Logger notificationLogger;
+ private final Gson gson;
+
+
+ private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>();
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create a notification message deserializer.
+ *
+ * @param notificationMessageType the type of the notification message
+ * @param expectedVersion the expected message version
+ * @param gson JSON serialization/deserialization
+ * @param notificationLogger logger for message version mismatch
+ */
+ public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
+ Gson gson, Logger notificationLogger) {
+ this.notificationMessageType = notificationMessageType;
+ this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
+ this.expectedVersion = expectedVersion;
+ this.gson = gson;
+ this.notificationLogger = notificationLogger;
+ }
+
+ // ----- MessageDeserializer ---------------------------------------------
+
+ @Override
+ public T deserialize(String messageJson) {
+ final T ret;
+
+ AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
+
+ if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
+ ret = gson.fromJson(messageJson, messageType);
+ } else {
+ String msgJson = messageJson;
+
+ if (msg.getMsgSplitCount() > 1) { // multi-part message
+ AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+
+ checkVersion(splitMsg, msgJson);
+
+ String msgId = splitMsg.getMsgId();
+
+ if (StringUtils.isEmpty(msgId)) {
+ LOG.error("Received multi-part message with no message ID. Ignoring message");
+
+ msg = null;
+ } else {
+ final int splitIdx = splitMsg.getMsgSplitIdx();
+ final int splitCount = splitMsg.getMsgSplitCount();
+
+ final AtlasNotificationStringMessage[] splitMsgs;
+
+ if (splitIdx == 0) {
+ splitMsgs = new AtlasNotificationStringMessage[splitCount];
+
+ splitMsgBuffer.put(msgId, splitMsgs);
+ } else {
+ splitMsgs = splitMsgBuffer.get(msgId);
+ }
+
+ if (splitMsgs == null) {
+ LOG.error("Received multi-part message: msgID={}, {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
+
+ msg = null;
+ } else if (splitMsgs.length <= splitIdx) {
+ LOG.error("Received multi-part message: msgID={}, {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
+
+ msg = null;
+ } else {
+ LOG.info("Received multi-part message: msgID={}, {} of {}", msgId, splitIdx + 1, splitCount);
+
+ splitMsgs[splitIdx] = splitMsg;
+
+ if (splitIdx == (splitCount - 1)) { // last message
+ splitMsgBuffer.remove(msgId);
+
+ boolean isValidMessage = true;
+
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < splitMsgs.length; i++) {
+ splitMsg = splitMsgs[i];
+
+ if (splitMsg == null) {
+ LOG.warn("Multi-part message: msgID={}, message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
+
+ isValidMessage = false;
+
+ break;
+ }
+
+ sb.append(splitMsg.getMessage());
+ }
+
+ if (isValidMessage) {
+ msgJson = sb.toString();
+
+ if (CompressionKind.GZIP.equals(splitMsg.getMsgCompressionKind())) {
+ byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
+ byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
+
+ msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
+
+ LOG.info("Received multi-part, compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
+ } else {
+ byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
+ byte[] bytes = AtlasNotificationBaseMessage.decodeBase64(encodedBytes);
+
+ msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
+
+ LOG.info("Received multi-part message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
+ }
+
+ msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class);
+ } else {
+ msg = null;
+ }
+ } else { // more messages to arrive
+ msg = null;
+ }
+ }
+ }
+ }
+
+ if (msg != null) {
+ if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) {
+ AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+
+ byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage());
+ byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
+
+ msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
+
+ LOG.info("Received compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
+ }
+
+ AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType);
+
+ checkVersion(atlasNotificationMessage, msgJson);
+
+ ret = atlasNotificationMessage.getMessage();
+ } else {
+ ret = null;
+ }
+ }
+
+ return ret;
+ }
+
+ // ----- helper methods --------------------------------------------------
+
+ /**
+ * Check the message version against the expected version.
+ *
+ * @param notificationMessage the notification message
+ * @param messageJson the notification message json
+ *
+ * @throws IncompatibleVersionException if the message version is incompatable with the expected version
+ */
+ protected void checkVersion(AtlasNotificationBaseMessage notificationMessage, String messageJson) {
+ int comp = notificationMessage.compareVersion(expectedVersion);
+
+ // message has newer version
+ if (comp > 0) {
+ String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson);
+
+ notificationLogger.error(msg);
+
+ throw new IncompatibleVersionException(msg);
+ }
+
+ // message has older version
+ if (comp < 0) {
+ notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
new file mode 100644
index 0000000..193735c
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+
+public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage {
+ private String message = null;
+
+ public AtlasNotificationStringMessage() {
+ super(AbstractNotification.CURRENT_MESSAGE_VERSION);
+ }
+
+ public AtlasNotificationStringMessage(String message) {
+ super(AbstractNotification.CURRENT_MESSAGE_VERSION);
+
+ this.message = message;
+ }
+
+ public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) {
+ super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
+
+ this.message = message;
+ }
+
+ public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
+ super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
+
+ this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes);
+ }
+
+ public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
+ super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
+
+ this.message = new String(encodedBytes, offset, length);
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
index 6ef407a..7f96638 100644
--- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
+++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
@@ -29,6 +29,9 @@ public class MessageVersion implements Comparable<MessageVersion> {
* Used for message with no version (old format).
*/
public static final MessageVersion NO_VERSION = new MessageVersion("0");
+ public static final MessageVersion VERSION_1 = new MessageVersion("1.0.0");
+
+ public static final MessageVersion CURRENT_VERSION = VERSION_1;
private final String version;
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 956c85e..a787862 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -54,9 +54,9 @@ public interface NotificationInterface {
* Versioned notification message class types.
*/
Type HOOK_VERSIONED_MESSAGE_TYPE =
- new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType();
+ new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType();
- Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType();
+ Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType();
/**
* Atlas notification types.
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
deleted file mode 100644
index 1929eb4..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-/**
- * Represents a notification message that is associated with a version.
- */
-public class VersionedMessage<T> {
-
- /**
- * The version of the message.
- */
- private final MessageVersion version;
-
- /**
- * The actual message.
- */
- private final T message;
-
-
- // ----- Constructors ----------------------------------------------------
-
- /**
- * Create a versioned message.
- *
- * @param version the message version
- * @param message the actual message
- */
- public VersionedMessage(MessageVersion version, T message) {
- this.version = version;
- this.message = message;
- }
-
-
- // ----- VersionedMessage ------------------------------------------------
-
- /**
- * Compare the version of this message with the given version.
- *
- * @param compareToVersion the version to compare to
- *
- * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
- * or greater than the given version.
- */
- public int compareVersion(MessageVersion compareToVersion) {
- return version.compareTo(compareToVersion);
- }
-
-
- // ----- accessors -------------------------------------------------------
-
- public MessageVersion getVersion() {
- return version;
- }
-
- public T getMessage() {
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
deleted file mode 100644
index cc2099e..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import com.google.gson.Gson;
-import org.slf4j.Logger;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-/**
- * Deserializer that works with versioned messages. The version of each deserialized message is checked against an
- * expected version.
- */
-public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
-
- public static final String VERSION_MISMATCH_MSG =
- "Notification message version mismatch. Expected %s but recieved %s. Message %s";
-
- private final Type versionedMessageType;
- private final MessageVersion expectedVersion;
- private final Logger notificationLogger;
- private final Gson gson;
-
-
- // ----- Constructors ----------------------------------------------------
-
- /**
- * Create a versioned message deserializer.
- *
- * @param versionedMessageType the type of the versioned message
- * @param expectedVersion the expected message version
- * @param gson JSON serialization/deserialization
- * @param notificationLogger logger for message version mismatch
- */
- public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion,
- Gson gson, Logger notificationLogger) {
- this.versionedMessageType = versionedMessageType;
- this.expectedVersion = expectedVersion;
- this.gson = gson;
- this.notificationLogger = notificationLogger;
- }
-
-
- // ----- MessageDeserializer ---------------------------------------------
-
- @Override
- public T deserialize(String messageJson) {
- VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType);
-
- // older style messages not wrapped with VersionedMessage
- if (versionedMessage.getVersion() == null) {
- Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0];
- versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t));
- }
- checkVersion(versionedMessage, messageJson);
-
- return versionedMessage.getMessage();
- }
-
-
- // ----- helper methods --------------------------------------------------
-
- /**
- * Check the message version against the expected version.
- *
- * @param versionedMessage the versioned message
- * @param messageJson the notification message json
- *
- * @throws IncompatibleVersionException if the message version is incompatable with the expected version
- */
- protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) {
- int comp = versionedMessage.compareVersion(expectedVersion);
-
- // message has newer version
- if (comp > 0) {
- String msg =
- String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), messageJson);
- notificationLogger.error(msg);
- throw new IncompatibleVersionException(msg);
- }
-
- // message has older version
- if (comp < 0) {
- notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(),
- messageJson));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 9b712f4..08a20bd 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -19,11 +19,8 @@
package org.apache.atlas.kafka;
import kafka.message.MessageAndMetadata;
-import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.MessageVersion;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.IncompatibleVersionException;
-import org.apache.atlas.notification.VersionedMessage;
+import org.apache.atlas.notification.*;
+import org.apache.atlas.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.entity.EntityNotificationImplTest;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.IStruct;
@@ -82,7 +79,7 @@ public class KafkaConsumerTest {
HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity);
- String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
+ String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
List<ConsumerRecord> klist = new ArrayList<>();
@@ -119,7 +116,7 @@ public class KafkaConsumerTest {
HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity);
- String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
+ String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
List<ConsumerRecord> klist = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index b7474a0..09e2e43 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -17,20 +17,14 @@
*/
package org.apache.atlas.kafka;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
-import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -38,11 +32,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.apache.atlas.kafka.AtlasKafkaConsumer;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import scala.actors.threadpool.Arrays;
+
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -90,7 +82,7 @@ public class KafkaNotificationMockTest {
when(producer.send(expectedRecord)).thenReturn(returnValue);
kafkaNotification.sendInternalToProducer(producer,
- NotificationInterface.NotificationType.HOOK, new String[]{message});
+ NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
verify(producer).send(expectedRecord);
}
@@ -112,7 +104,7 @@ public class KafkaNotificationMockTest {
try {
kafkaNotification.sendInternalToProducer(producer,
- NotificationInterface.NotificationType.HOOK, new String[]{message});
+ NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
fail("Should have thrown NotificationException");
} catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 1);
@@ -142,7 +134,7 @@ public class KafkaNotificationMockTest {
try {
kafkaNotification.sendInternalToProducer(producer,
- NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
+ NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message1, message2}));
fail("Should have thrown NotificationException");
} catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 2);
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 3b2a093..12f48d1 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -30,7 +30,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
-import static org.mockito.Matchers.endsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@@ -57,15 +56,15 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
+ jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+ jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+ jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+ jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
- Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+ Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+ new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
@@ -91,9 +90,9 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
- String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
- String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3));
+ String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
+ String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
String json4 = GSON.toJson(testMessage4);
jsonList.add(json1);
@@ -101,10 +100,10 @@ public class AbstractNotificationConsumerTest {
jsonList.add(json3);
jsonList.add(json4);
- Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+ Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+ new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
@@ -127,16 +126,16 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
- String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
+ String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
jsonList.add(json1);
jsonList.add(json2);
- Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+ Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+ new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
try {
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
@@ -187,8 +186,8 @@ public class AbstractNotificationConsumerTest {
private final List<T> messageList;
private int index = 0;
- public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) {
- super(new TestDeserializer<T>(versionedMessageType, logger));
+ public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) {
+ super(new TestDeserializer<T>(notificationMessageType, logger));
this.messageList = messages;
}
@@ -222,10 +221,10 @@ public class AbstractNotificationConsumerTest {
}
}
- private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
+ private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
- private TestDeserializer(Type versionedMessageType, Logger logger) {
- super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
+ private TestDeserializer(Type notificationMessageType, Logger logger) {
+ super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
}
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 61107a9..4719324 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.notification.hook.HookNotification;
import org.apache.commons.configuration.Configuration;
import org.testng.annotations.Test;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -44,17 +45,18 @@ public class AbstractNotificationTest {
TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
- String messageJson1 = AbstractNotification.getMessageJson(message1);
- String messageJson2 = AbstractNotification.getMessageJson(message2);
- String messageJson3 = AbstractNotification.getMessageJson(message3);
+ List<String> messageJson = new ArrayList<>();
+ AbstractNotification.createNotificationMessages(message1, messageJson);
+ AbstractNotification.createNotificationMessages(message2, messageJson);
+ AbstractNotification.createNotificationMessages(message3, messageJson);
notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
- assertEquals(3, notification.messages.length);
- assertEquals(messageJson1, notification.messages[0]);
- assertEquals(messageJson2, notification.messages[1]);
- assertEquals(messageJson3, notification.messages[2]);
+ assertEquals(3, notification.messages.size());
+ assertEquals(messageJson.get(0), notification.messages.get(0));
+ assertEquals(messageJson.get(1), notification.messages.get(1));
+ assertEquals(messageJson.get(2), notification.messages.get(2));
}
@Test
@@ -72,17 +74,16 @@ public class AbstractNotificationTest {
messages.add(message2);
messages.add(message3);
- String messageJson1 = AbstractNotification.getMessageJson(message1);
- String messageJson2 = AbstractNotification.getMessageJson(message2);
- String messageJson3 = AbstractNotification.getMessageJson(message3);
+ List<String> messageJson = new ArrayList<>();
+ AbstractNotification.createNotificationMessages(message1, messageJson);
+ AbstractNotification.createNotificationMessages(message2, messageJson);
+ AbstractNotification.createNotificationMessages(message3, messageJson);
notification.send(NotificationInterface.NotificationType.HOOK, messages);
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
- assertEquals(3, notification.messages.length);
- assertEquals(messageJson1, notification.messages[0]);
- assertEquals(messageJson2, notification.messages[1]);
- assertEquals(messageJson3, notification.messages[2]);
+ assertEquals(messageJson.size(), notification.messages.size());
+ assertEquals(messageJson, notification.messages);
}
public static class TestMessage extends HookNotification.HookNotificationMessage {
@@ -94,14 +95,14 @@ public class AbstractNotificationTest {
public static class TestNotification extends AbstractNotification {
private NotificationType type;
- private String[] messages;
+ private List<String> messages;
public TestNotification(Configuration applicationProperties) throws AtlasException {
super(applicationProperties);
}
@Override
- protected void sendInternal(NotificationType notificationType, String[] notificationMessages)
+ protected void sendInternal(NotificationType notificationType, List<String> notificationMessages)
throws NotificationException {
type = notificationType;
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
new file mode 100644
index 0000000..27b5034
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+/**
+ * AtlasNotificationMessage tests.
+ */
+public class AtlasNotificationMessageTest {
+
+ @Test
+ public void testGetVersion() throws Exception {
+ MessageVersion version = new MessageVersion("1.0.0");
+ AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, "a");
+ assertEquals(atlasNotificationMessage.getVersion(), version);
+ }
+
+ @Test
+ public void testGetMessage() throws Exception {
+ String message = "a";
+ MessageVersion version = new MessageVersion("1.0.0");
+ AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, message);
+ assertEquals(atlasNotificationMessage.getMessage(), message);
+ }
+
+ @Test
+ public void testCompareVersion() throws Exception {
+ MessageVersion version1 = new MessageVersion("1.0.0");
+ MessageVersion version2 = new MessageVersion("2.0.0");
+ MessageVersion version3 = new MessageVersion("0.5.0");
+
+ AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version1, "a");
+
+ assertTrue(atlasNotificationMessage.compareVersion(version1) == 0);
+ assertTrue(atlasNotificationMessage.compareVersion(version2) < 0);
+ assertTrue(atlasNotificationMessage.compareVersion(version3) > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
deleted file mode 100644
index 587b7eb..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.*;
-
-/**
- * VersionedMessage tests.
- */
-public class VersionedMessageTest {
-
- @Test
- public void testGetVersion() throws Exception {
- MessageVersion version = new MessageVersion("1.0.0");
- VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a");
- assertEquals(versionedMessage.getVersion(), version);
- }
-
- @Test
- public void testGetMessage() throws Exception {
- String message = "a";
- MessageVersion version = new MessageVersion("1.0.0");
- VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message);
- assertEquals(versionedMessage.getMessage(), message);
- }
-
- @Test
- public void testCompareVersion() throws Exception {
- MessageVersion version1 = new MessageVersion("1.0.0");
- MessageVersion version2 = new MessageVersion("2.0.0");
- MessageVersion version3 = new MessageVersion("0.5.0");
-
- VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a");
-
- assertTrue(versionedMessage.compareVersion(version1) == 0);
- assertTrue(versionedMessage.compareVersion(version2) < 0);
- assertTrue(versionedMessage.compareVersion(version3) > 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
index be32427..7b513da 100644
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
@@ -24,6 +24,7 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.testng.annotations.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -48,9 +49,20 @@ public class EntityMessageDeserializerTest {
EntityNotificationImpl notification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
- String json = AbstractNotification.getMessageJson(notification);
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(notification, jsonMsgList);
+
+ EntityNotification deserializedNotification = null;
+
+ for (String jsonMsg : jsonMsgList) {
+ deserializedNotification = deserializer.deserialize(jsonMsg);
+
+ if (deserializedNotification != null) {
+ break;
+ }
+ }
- EntityNotification deserializedNotification = deserializer.deserialize(json);
assertEquals(deserializedNotification.getOperationType(), notification.getOperationType());
assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId());
assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName());
http://git-wip-us.apache.org/repos/asf/atlas/blob/99243ee8/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
index 3724fd5..49b877b 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
@@ -20,51 +20,151 @@ package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.entity.EntityNotificationImplTest;
-import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
+import org.apache.commons.lang3.RandomStringUtils;
import org.testng.annotations.Test;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
* HookMessageDeserializer tests.
*/
public class HookMessageDeserializerTest {
+ HookMessageDeserializer deserializer = new HookMessageDeserializer();
+
@Test
public void testDeserialize() throws Exception {
- HookMessageDeserializer deserializer = new HookMessageDeserializer();
+ Referenceable entity = generateEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+ HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
+
+ assertEqualMessage(deserializedMessage, message);
+ }
+
+ // validate deserialization of legacy message, which doesn't use MessageVersion
+ @Test
+ public void testDeserializeLegacyMessage() throws Exception {
+ Referenceable entity = generateEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+ String jsonMsg = AbstractNotification.GSON.toJson(message);
+ HookNotificationMessage deserializedMessage = deserializer.deserialize(jsonMsg);
+
+ assertEqualMessage(deserializedMessage, message);
+ }
+
+ @Test
+ public void testDeserializeCompressedMessage() throws Exception {
+ Referenceable entity = generateLargeEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+ assertTrue(jsonMsgList.size() == 1);
+
+ String compressedMsg = jsonMsgList.get(0);
+ String uncompressedMsg = AbstractNotification.GSON.toJson(message);
+
+ assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")");
+
+ HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
+
+ assertEqualMessage(deserializedMessage, message);
+ }
- Referenceable entity = EntityNotificationImplTest.getEntity("id");
- String traitName = "MyTrait";
- List<IStruct> traitInfo = new LinkedList<>();
- IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
- traitInfo.add(trait);
+ @Test
+ public void testDeserializeSplitMessage() throws Exception {
+ Referenceable entity = generateVeryLargeEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(message, jsonMsgList);
- HookNotification.EntityUpdateRequest message =
- new HookNotification.EntityUpdateRequest("user1", entity);
+ assertTrue(jsonMsgList.size() > 1);
- String json = AbstractNotification.getMessageJson(message);
+ HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
- HookNotification.HookNotificationMessage deserializedMessage = deserializer.deserialize(json);
+ assertEqualMessage(deserializedMessage, message);
+ }
+
+ private Referenceable generateEntityWithTrait() {
+ Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+ return ret;
+ }
+
+ private HookNotificationMessage deserialize(List<String> jsonMsgList) {
+ HookNotificationMessage deserializedMessage = null;
+
+ for (String jsonMsg : jsonMsgList) {
+ deserializedMessage = deserializer.deserialize(jsonMsg);
+
+ if (deserializedMessage != null) {
+ break;
+ }
+ }
+
+ return deserializedMessage;
+ }
+
+ private void assertEqualMessage(HookNotificationMessage deserializedMessage, EntityUpdateRequest message) throws Exception {
+ assertNotNull(deserializedMessage);
assertEquals(deserializedMessage.getType(), message.getType());
assertEquals(deserializedMessage.getUser(), message.getUser());
- assertTrue(deserializedMessage instanceof HookNotification.EntityUpdateRequest);
+ assertTrue(deserializedMessage instanceof EntityUpdateRequest);
- HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
- (HookNotification.EntityUpdateRequest) deserializedMessage;
+ EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage;
+ Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
+ Referenceable entity = message.getEntities().get(0);
+ String traitName = entity.getTraits().get(0);
- Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
assertEquals(deserializedEntity.getId(), entity.getId());
assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
assertEquals(deserializedEntity.getTraits(), entity.getTraits());
- assertEquals(deserializedEntity.getTrait(traitName), entity.getTrait(traitName));
+ assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode());
+
+ }
+
+ private Referenceable generateLargeEntityWithTrait() {
+ Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+ // add 100 attributes, each with value of size 10k
+ // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split
+ String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression
+ for (int i = 0; i < 100; i++) {
+ ret.set("attr_" + i, attrValue);
+ }
+
+ return ret;
+ }
+
+ private Referenceable generateVeryLargeEntityWithTrait() {
+ Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+ // add 300 attributes, each with value of size 10k
+ // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split
+ for (int i = 0; i < 300; i++) {
+ ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024));
+ }
+
+ return ret;
}
}