You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/06 22:50:06 UTC
[2/3] atlas git commit: ATLAS-2251: notification module updates
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 988d98a..8bc7cb4 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -18,17 +18,19 @@
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jettison.json.JSONArray;
@@ -44,8 +46,8 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
-import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
+import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
+import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
/**
* Abstract notification interface implementation.
@@ -78,14 +80,6 @@ public abstract class AbstractNotification implements NotificationInterface {
private final boolean embedded;
private final boolean isHAEnabled;
- /**
- * Used for message serialization.
- */
- public static final Gson GSON = new GsonBuilder().
- registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()).
- registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
- create();
-
// ----- Constructors ----------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
@@ -158,7 +152,7 @@ public abstract class AbstractNotification implements NotificationInterface {
public static String getMessageJson(Object message) {
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
- return GSON.toJson(notificationMsg);
+ return AtlasType.toV1Json(notificationMsg);
}
private static String getHostAddress() {
@@ -188,7 +182,7 @@ public abstract class AbstractNotification implements NotificationInterface {
*/
public static void createNotificationMessages(Object message, List<String> msgJsonList) {
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
- String msgJson = GSON.toJson(notificationMsg);
+ String msgJson = AtlasType.toV1Json(notificationMsg);
boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
@@ -213,7 +207,7 @@ public abstract class AbstractNotification implements NotificationInterface {
if (!msgLengthExceedsLimit) { // no need to split
AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind);
- msgJson = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
+ msgJson = AtlasType.toV1Json(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
msgBytes = null; // not used after this point
} else { // encodedBytes will be split
msgJson = null; // not used after this point
@@ -239,7 +233,7 @@ public abstract class AbstractNotification implements NotificationInterface {
AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount);
- String splitMsgJson = GSON.toJson(splitMsg);
+ String splitMsgJson = AtlasType.toV1Json(splitMsg);
msgJsonList.add(splitMsgJson);
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 8cf1e8e..c3940ce 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -16,30 +16,19 @@
* limitations under the License.
*/
package org.apache.atlas.notification;
+
import org.apache.kafka.common.TopicPartition;
+
/**
* Abstract notification consumer.
*/
public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
+ protected final AtlasNotificationMessageDeserializer<T> deserializer;
- /**
- * Deserializer used to deserialize notification messages for this consumer.
- */
- protected final MessageDeserializer<T> deserializer;
-
-
-
- /**
- * Construct an AbstractNotificationConsumer.
- *
- * @param deserializer the message deserializer used by this consumer
- */
- public AbstractNotificationConsumer(MessageDeserializer<T> deserializer) {
+ protected AbstractNotificationConsumer(AtlasNotificationMessageDeserializer<T> deserializer) {
this.deserializer = deserializer;
}
-
-
public abstract void commit(TopicPartition partition, long offset);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
deleted file mode 100644
index 3b377de..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.commons.compress.utils.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-
-public class AtlasNotificationBaseMessage {
- private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class);
-
- public static final int MESSAGE_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop;
- public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean();
-
- public enum CompressionKind { NONE, GZIP };
-
- private MessageVersion version = null;
- private String msgId = null;
- private CompressionKind msgCompressionKind = CompressionKind.NONE;
- private int msgSplitIdx = 1;
- private int msgSplitCount = 1;
-
-
- public AtlasNotificationBaseMessage() {
- }
-
- public AtlasNotificationBaseMessage(MessageVersion version) {
- this(version, null, CompressionKind.NONE);
- }
-
- public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
- this.version = version;
- this.msgId = msgId;
- this.msgCompressionKind = msgCompressionKind;
- }
-
- public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
- this.version = version;
- this.msgId = msgId;
- this.msgCompressionKind = msgCompressionKind;
- this.msgSplitIdx = msgSplitIdx;
- this.msgSplitCount = msgSplitCount;
- }
-
- public void setVersion(MessageVersion version) {
- this.version = version;
- }
-
- public MessageVersion getVersion() {
- return version;
- }
-
- public String getMsgId() {
- return msgId;
- }
-
- public void setMsgId(String msgId) {
- this.msgId = msgId;
- }
-
- public CompressionKind getMsgCompressionKind() {
- return msgCompressionKind;
- }
-
- public void setMsgCompressed(CompressionKind msgCompressionKind) {
- this.msgCompressionKind = msgCompressionKind;
- }
-
- public int getMsgSplitIdx() {
- return msgSplitIdx;
- }
-
- public void setMsgSplitIdx(int msgSplitIdx) {
- this.msgSplitIdx = msgSplitIdx;
- }
-
- public int getMsgSplitCount() {
- return msgSplitCount;
- }
-
- public void setMsgSplitCount(int msgSplitCount) {
- this.msgSplitCount = msgSplitCount;
- }
-
- /**
- * Compare the version of this message with the given version.
- *
- * @param compareToVersion the version to compare to
- *
- * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
- * or greater than the given version.
- */
- public int compareVersion(MessageVersion compareToVersion) {
- return version.compareTo(compareToVersion);
- }
-
-
- public static byte[] getBytesUtf8(String str) {
- return StringUtils.getBytesUtf8(str);
- }
-
- public static String getStringUtf8(byte[] bytes) {
- return StringUtils.newStringUtf8(bytes);
- }
-
- public static byte[] encodeBase64(byte[] bytes) {
- return Base64.encodeBase64(bytes);
- }
-
- public static byte[] decodeBase64(byte[] bytes) {
- return Base64.decodeBase64(bytes);
- }
-
- public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) {
- return encodeBase64(gzipCompress(bytes));
- }
-
- public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) {
- return gzipUncompress(decodeBase64(bytes));
- }
-
- public static String gzipCompress(String str) {
- byte[] bytes = getBytesUtf8(str);
- byte[] compressedBytes = gzipCompress(bytes);
- byte[] encodedBytes = encodeBase64(compressedBytes);
-
- return getStringUtf8(encodedBytes);
- }
-
- public static String gzipUncompress(String str) {
- byte[] encodedBytes = getBytesUtf8(str);
- byte[] compressedBytes = decodeBase64(encodedBytes);
- byte[] bytes = gzipUncompress(compressedBytes);
-
- return getStringUtf8(bytes);
- }
-
- public static byte[] gzipCompress(byte[] content) {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-
- try {
- GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
-
- gzipOutputStream.write(content);
- gzipOutputStream.close();
- } catch (IOException e) {
- LOG.error("gzipCompress(): error compressing {} bytes", content.length, e);
-
- throw new RuntimeException(e);
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
- public static byte[] gzipUncompress(byte[] content) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- try {
- IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out);
- } catch (IOException e) {
- LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e);
- }
-
- return out.toByteArray();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
deleted file mode 100644
index 63d93c9..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.Instant;
-
-/**
- * Represents a notification message that is associated with a version.
- */
-public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
- private String msgSourceIP;
- private String msgCreatedBy;
- private long msgCreationTime;
-
- /**
- * The actual message.
- */
- private final T message;
-
-
- // ----- Constructors ----------------------------------------------------
-
- /**
- * Create a notification message.
- *
- * @param version the message version
- * @param message the actual message
- */
- public AtlasNotificationMessage(MessageVersion version, T message) {
- this(version, message, null, null);
- }
-
- public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
- super(version);
-
- this.msgSourceIP = msgSourceIP;
- this.msgCreatedBy = createdBy;
- this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis();
- this.message = message;
- }
-
-
- public String getMsgSourceIP() {
- return msgSourceIP;
- }
-
- public void setMsgSourceIP(String msgSourceIP) {
- this.msgSourceIP = msgSourceIP;
- }
-
- public String getMsgCreatedBy() {
- return msgCreatedBy;
- }
-
- public void setMsgCreatedBy(String msgCreatedBy) {
- this.msgCreatedBy = msgCreatedBy;
- }
-
- public long getMsgCreationTime() {
- return msgCreationTime;
- }
-
- public void setMsgCreationTime(long msgCreationTime) {
- this.msgCreationTime = msgCreationTime;
- }
-
- public T getMessage() {
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 2a175ba..d6e6878 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -19,14 +19,17 @@
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
import org.apache.commons.lang3.StringUtils;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -47,11 +50,10 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
public static final String VERSION_MISMATCH_MSG =
"Notification message version mismatch. Expected %s but recieved %s. Message %s";
- private final Type notificationMessageType;
- private final Type messageType;
- private final MessageVersion expectedVersion;
- private final Logger notificationLogger;
- private final Gson gson;
+ private final TypeReference<T> messageType;
+ private final TypeReference<AtlasNotificationMessage<T>> notificationMessageType;
+ private final MessageVersion expectedVersion;
+ private final Logger notificationLogger;
private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>();
@@ -65,33 +67,40 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
/**
* Create a notification message deserializer.
*
- * @param notificationMessageType the type of the notification message
* @param expectedVersion the expected message version
- * @param gson JSON serialization/deserialization
* @param notificationLogger logger for message version mismatch
*/
- public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
- Gson gson, Logger notificationLogger) {
- this(notificationMessageType, expectedVersion, gson, notificationLogger,
+ public AtlasNotificationMessageDeserializer(TypeReference<T> messageType,
+ TypeReference<AtlasNotificationMessage<T>> notificationMessageType,
+ MessageVersion expectedVersion, Logger notificationLogger) {
+ this(messageType, notificationMessageType, expectedVersion, notificationLogger,
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000,
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000);
}
- public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
- Gson gson, Logger notificationLogger,
+ public AtlasNotificationMessageDeserializer(TypeReference<T> messageType,
+ TypeReference<AtlasNotificationMessage<T>> notificationMessageType,
+ MessageVersion expectedVersion,
+ Logger notificationLogger,
long splitMessageSegmentsWaitTimeMs,
long splitMessageBufferPurgeIntervalMs) {
+ this.messageType = messageType;
this.notificationMessageType = notificationMessageType;
- this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
this.expectedVersion = expectedVersion;
- this.gson = gson;
this.notificationLogger = notificationLogger;
this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs;
this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs;
}
- // ----- MessageDeserializer ---------------------------------------------
+ public TypeReference<T> getMessageType() {
+ return messageType;
+ }
+ public TypeReference<AtlasNotificationMessage<T>> getNotificationMessageType() {
+ return notificationMessageType;
+ }
+
+ // ----- MessageDeserializer ---------------------------------------------
@Override
public T deserialize(String messageJson) {
final T ret;
@@ -99,15 +108,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
messageCountTotal.incrementAndGet();
messageCountSinceLastInterval.incrementAndGet();
- AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
+ AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class);
if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
- ret = gson.fromJson(messageJson, messageType);
+ ret = AtlasType.fromV1Json(messageJson, messageType);
} else {
String msgJson = messageJson;
if (msg.getMsgSplitCount() > 1) { // multi-part message
- AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+ AtlasNotificationStringMessage splitMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);
checkVersion(splitMsg, msgJson);
@@ -184,7 +193,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length);
}
- msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class);
+ msg = AtlasType.fromV1Json(msgJson, AtlasNotificationBaseMessage.class);
} else {
msg = null;
}
@@ -197,7 +206,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
if (msg != null) {
if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) {
- AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+ AtlasNotificationStringMessage compressedMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);
byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage());
byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
@@ -207,7 +216,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
}
- AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType);
+ AtlasNotificationMessage<T> atlasNotificationMessage = AtlasType.fromV1Json(msgJson, notificationMessageType);
checkVersion(atlasNotificationMessage, msgJson);
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
deleted file mode 100644
index 41485a0..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-
-public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage {
- private String message = null;
-
- public AtlasNotificationStringMessage() {
- super(AbstractNotification.CURRENT_MESSAGE_VERSION);
- }
-
- public AtlasNotificationStringMessage(String message) {
- super(AbstractNotification.CURRENT_MESSAGE_VERSION);
-
- this.message = message;
- }
-
- public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) {
- super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
-
- this.message = message;
- }
-
- public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
- super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
-
- this.message = message;
- }
-
- public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
- super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
-
- this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes);
- }
-
- public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
- super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
-
- this.message = new String(encodedBytes, offset, length);
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- public String getMessage() {
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
deleted file mode 100644
index 7f96638..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-/**
- * Represents the version of a notification message.
- */
-public class MessageVersion implements Comparable<MessageVersion> {
- /**
- * Used for message with no version (old format).
- */
- public static final MessageVersion NO_VERSION = new MessageVersion("0");
- public static final MessageVersion VERSION_1 = new MessageVersion("1.0.0");
-
- public static final MessageVersion CURRENT_VERSION = VERSION_1;
-
- private final String version;
-
-
- // ----- Constructors ----------------------------------------------------
-
- /**
- * Create a message version.
- *
- * @param version the version string
- */
- public MessageVersion(String version) {
- this.version = version;
-
- try {
- getVersionParts();
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e);
- }
- }
-
-
- // ----- Comparable ------------------------------------------------------
-
- @Override
- public int compareTo(MessageVersion that) {
- if (that == null) {
- return 1;
- }
-
- Integer[] thisParts = getVersionParts();
- Integer[] thatParts = that.getVersionParts();
-
- int length = Math.max(thisParts.length, thatParts.length);
-
- for (int i = 0; i < length; i++) {
-
- int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i);
-
- if (comp != 0) {
- return comp;
- }
- }
- return 0;
- }
-
-
- // ----- Object overrides ------------------------------------------------
-
- @Override
- public boolean equals(Object that) {
- if (this == that){
- return true;
- }
-
- if (that == null || getClass() != that.getClass()) {
- return false;
- }
-
- return compareTo((MessageVersion) that) == 0;
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(getVersionParts());
- }
-
-
- @Override
- public String toString() {
- return "MessageVersion[version=" + version + "]";
- }
-
- // ----- helper methods --------------------------------------------------
-
- /**
- * Get the version parts array by splitting the version string.
- * Strip the trailing zeros (i.e. '1.0.0' equals '1').
- *
- * @return the version parts array
- */
- protected Integer[] getVersionParts() {
-
- String[] sParts = version.split("\\.");
- ArrayList<Integer> iParts = new ArrayList<>();
- int trailingZeros = 0;
-
- for (String sPart : sParts) {
- Integer iPart = new Integer(sPart);
-
- if (iPart == 0) {
- ++trailingZeros;
- } else {
- for (int i = 0; i < trailingZeros; ++i) {
- iParts.add(0);
- }
- trailingZeros = 0;
- iParts.add(iPart);
- }
- }
- return iParts.toArray(new Integer[iParts.size()]);
- }
-
- private Integer getVersionPart(Integer[] versionParts, int i) {
- return i < versionParts.length ? versionParts[i] : 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 8809225..975967d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -18,10 +18,13 @@
package org.apache.atlas.notification;
import com.google.gson.reflect.TypeToken;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.entity.EntityMessageDeserializer;
-import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.hook.HookMessageDeserializer;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
+import org.codehaus.jackson.type.TypeReference;
+import scala.reflect.internal.Types;
import java.lang.reflect.Type;
import java.util.List;
@@ -53,47 +56,25 @@ public interface NotificationInterface {
/**
* Versioned notification message class types.
*/
- Type HOOK_VERSIONED_MESSAGE_TYPE =
- new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType();
-
Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType();
/**
* Atlas notification types.
*/
enum NotificationType {
-
// Notifications from the Atlas integration hooks.
- HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()),
+ HOOK(new HookMessageDeserializer()),
// Notifications to entity change consumers.
- ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer());
-
-
- /**
- * The notification class associated with this type.
- */
- private final Class classType;
-
- /**
- * The message deserializer for this type.
- */
- private final MessageDeserializer deserializer;
+ ENTITIES(new EntityMessageDeserializer());
+ private final AtlasNotificationMessageDeserializer deserializer;
- NotificationType(Class classType, MessageDeserializer<?> deserializer) {
- this.classType = classType;
+ NotificationType(AtlasNotificationMessageDeserializer deserializer) {
this.deserializer = deserializer;
}
-
- // ----- accessors ---------------------------------------------------
-
- public Class getClassType() {
- return classType;
- }
-
- public MessageDeserializer getDeserializer() {
+ public AtlasNotificationMessageDeserializer getDeserializer() {
return deserializer;
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
index 148b57f..10df121 100644
--- a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
+++ b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
@@ -18,6 +18,8 @@
package org.apache.atlas.notification;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
+
public class SplitMessageAggregator {
private final String msgId;
private final AtlasNotificationStringMessage[] splitMessagesBuffer;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
index a6f7e64..526aa93 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
@@ -18,19 +18,14 @@
package org.apache.atlas.notification.entity;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.AbstractMessageDeserializer;
import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.Map;
-
/**
* Entity notification message deserializer.
*/
@@ -48,29 +43,8 @@ public class EntityMessageDeserializer extends AbstractMessageDeserializer<Entit
* Create an entity notification message deserializer.
*/
public EntityMessageDeserializer() {
- super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE,
- AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
- }
-
-
- // ----- helper methods --------------------------------------------------
-
- private static Map<Type, JsonDeserializer> getDeserializerMap() {
- return Collections.<Type, JsonDeserializer>singletonMap(
- NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer());
- }
-
-
- // ----- deserializer classes --------------------------------------------
-
- /**
- * Deserializer for EntityNotification.
- */
- protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
- @Override
- public EntityNotification deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
- return context.deserialize(json, EntityNotificationImpl.class);
- }
+ super(new TypeReference<EntityNotification>() {},
+ new TypeReference<AtlasNotificationMessage<EntityNotification>>() {},
+ AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
deleted file mode 100644
index 96e2e2f..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.entity;
-
-
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-
-import java.util.List;
-
-/**
- * Notification of entity changes.
- */
-public interface EntityNotification {
-
- /**
- * Operations that result in an entity notification.
- */
- enum OperationType {
- ENTITY_CREATE,
- ENTITY_UPDATE,
- ENTITY_DELETE,
- TRAIT_ADD,
- TRAIT_DELETE,
- TRAIT_UPDATE
- }
-
-
- // ----- EntityNotification ------------------------------------------------
-
- /**
- * Get the entity that is associated with this notification.
- *
- * @return the associated entity
- */
- Referenceable getEntity();
-
- /**
- * Get flattened list of traits that are associated with this entity (includes super traits).
- *
- * @return the list of all traits
- */
- List<Struct> getAllTraits();
-
- /**
- * Get the type of operation that triggered this notification.
- *
- * @return the operation type
- */
- OperationType getOperationType();
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
deleted file mode 100644
index ab8e4c8..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.entity;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.type.AtlasClassificationType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * Entity notification implementation.
- */
-public class EntityNotificationImpl implements EntityNotification {
-
- private final Referenceable entity;
- private final OperationType operationType;
- private final List<Struct> traits;
-
-
- // ----- Constructors ------------------------------------------------------
-
- /**
- * No-arg constructor for serialization.
- */
- @SuppressWarnings("unused")
- private EntityNotificationImpl() throws AtlasException {
- this(null, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
- }
-
- /**
- * Construct an EntityNotification.
- *
- * @param entity the entity subject of the notification
- * @param operationType the type of operation that caused the notification
- * @param traits the traits for the given entity
- *
- * @throws AtlasException if the entity notification can not be created
- */
- public EntityNotificationImpl(Referenceable entity, OperationType operationType, List<Struct> traits)
- throws AtlasException {
- this.entity = entity;
- this.operationType = operationType;
- this.traits = traits;
- }
-
- /**
- * Construct an EntityNotification.
- *
- * @param entity the entity subject of the notification
- * @param operationType the type of operation that caused the notification
- * @param typeRegistry the Atlas type system
- *
- * @throws AtlasException if the entity notification can not be created
- */
- public EntityNotificationImpl(Referenceable entity, OperationType operationType, AtlasTypeRegistry typeRegistry)
- throws AtlasException {
- this(entity, operationType, getAllTraits(entity, typeRegistry));
- }
-
-
- // ----- EntityNotification ------------------------------------------------
-
- @Override
- public Referenceable getEntity() {
- return entity;
- }
-
- @Override
- public List<Struct> getAllTraits() {
- return traits;
- }
-
- @Override
- public OperationType getOperationType() {
- return operationType;
- }
-
-
- // ----- Object overrides --------------------------------------------------
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- EntityNotificationImpl that = (EntityNotificationImpl) o;
- return Objects.equals(entity, that.entity) &&
- operationType == that.operationType &&
- Objects.equals(traits, that.traits);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(entity, operationType, traits);
- }
-
-
- // ----- helper methods ----------------------------------------------------
-
- private static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) throws AtlasException {
- List<Struct> ret = new LinkedList<>();
-
- for (String traitName : entityDefinition.getTraitNames()) {
- Struct trait = entityDefinition.getTrait(traitName);
- AtlasClassificationType traitType = typeRegistry.getClassificationTypeByName(traitName);
- Set<String> superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null;
-
- ret.add(trait);
-
- if (CollectionUtils.isNotEmpty(superTypeNames)) {
- for (String superTypeName : superTypeNames) {
- Struct superTypeTrait = new Struct(superTypeName);
-
- if (MapUtils.isNotEmpty(trait.getValues())) {
- AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
-
- if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) {
- Map<String, Object> attributes = new HashMap<>();
-
- // TODO: add superTypeTrait attributess
-
- superTypeTrait.setValues(attributes);
- }
- }
-
- ret.add(superTypeTrait);
- }
- }
- }
-
- return ret;
- }}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
index 8337de0..1b337d4 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
@@ -18,16 +18,15 @@
package org.apache.atlas.notification.hook;
-import com.google.gson.JsonDeserializer;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.AbstractMessageDeserializer;
import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.v1.model.notification.HookNotification;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.Map;
+
/**
* Hook notification message deserializer.
@@ -46,15 +45,11 @@ public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNot
* Create a hook notification message deserializer.
*/
public HookMessageDeserializer() {
- super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE,
- AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
+ super(new TypeReference<HookNotification.HookNotificationMessage>() {},
+ new TypeReference<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>() {},
+ AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
}
// ----- helper methods --------------------------------------------------
-
- private static Map<Type, JsonDeserializer> getDeserializerMap() {
- return Collections.<Type, JsonDeserializer>singletonMap(
- NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification());
- }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
deleted file mode 100644
index ca596ea..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.hook;
-
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.typedef.TypesDef;
-import org.apache.atlas.type.AtlasType;
-import org.apache.commons.lang.StringUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Contains the structure of messages transferred from hooks to atlas.
- */
-public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> {
-
- @Override
- public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) {
- HookNotificationType type =
- context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
- switch (type) {
- case ENTITY_CREATE:
- return context.deserialize(json, EntityCreateRequest.class);
-
- case ENTITY_FULL_UPDATE:
- return context.deserialize(json, EntityUpdateRequest.class);
-
- case ENTITY_PARTIAL_UPDATE:
- return context.deserialize(json, EntityPartialUpdateRequest.class);
-
- case ENTITY_DELETE:
- return context.deserialize(json, EntityDeleteRequest.class);
-
- case TYPE_CREATE:
- case TYPE_UPDATE:
- return context.deserialize(json, TypeRequest.class);
-
- default:
- throw new IllegalStateException("Unhandled type " + type);
- }
- }
-
- /**
- * Type of the hook message.
- */
- public enum HookNotificationType {
- TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE
- }
-
- /**
- * Base type of hook message.
- */
- public static class HookNotificationMessage {
- public static final String UNKNOW_USER = "UNKNOWN";
- protected HookNotificationType type;
- protected String user;
-
- private HookNotificationMessage() {
- }
-
- public HookNotificationMessage(HookNotificationType type, String user) {
- this.type = type;
- this.user = user;
- }
-
- public HookNotificationType getType() {
- return type;
- }
-
- public String getUser() {
- if (StringUtils.isEmpty(user)) {
- return UNKNOW_USER;
- }
- return user;
- }
-
-
- }
-
- /**
- * Hook message for create type definitions.
- */
- public static class TypeRequest extends HookNotificationMessage {
- private TypesDef typesDef;
-
- private TypeRequest() {
- }
-
- public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
- super(type, user);
- this.typesDef = typesDef;
- }
-
- public TypesDef getTypesDef() {
- return typesDef;
- }
- }
-
- /**
- * Hook message for creating new entities.
- */
- public static class EntityCreateRequest extends HookNotificationMessage {
- private List<Referenceable> entities;
-
- private EntityCreateRequest() {
- }
-
- public EntityCreateRequest(String user, Referenceable... entities) {
- this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
- }
-
- public EntityCreateRequest(String user, List<Referenceable> entities) {
- this(HookNotificationType.ENTITY_CREATE, entities, user);
- }
-
- protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
- super(type, user);
- this.entities = entities;
- }
-
- public EntityCreateRequest(String user, JSONArray jsonArray) {
- super(HookNotificationType.ENTITY_CREATE, user);
- entities = new ArrayList<>();
- for (int index = 0; index < jsonArray.length(); index++) {
- try {
- entities.add(AtlasType.fromV1Json(jsonArray.getString(index), Referenceable.class));
- } catch (JSONException e) {
- throw new JsonParseException(e);
- }
- }
- }
-
- public List<Referenceable> getEntities() {
- return entities;
- }
-
- @Override
- public String toString() {
- return entities.toString();
- }
- }
-
- /**
- * Hook message for updating entities(full update).
- */
- public static class EntityUpdateRequest extends EntityCreateRequest {
- public EntityUpdateRequest(String user, Referenceable... entities) {
- this(user, Arrays.asList(entities));
- }
-
- public EntityUpdateRequest(String user, List<Referenceable> entities) {
- super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
- }
- }
-
- /**
- * Hook message for updating entities(partial update).
- */
- public static class EntityPartialUpdateRequest extends HookNotificationMessage {
- private String typeName;
- private String attribute;
- private Referenceable entity;
- private String attributeValue;
-
- private EntityPartialUpdateRequest() {
- }
-
- public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue,
- Referenceable entity) {
- super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
- this.typeName = typeName;
- this.attribute = attribute;
- this.attributeValue = attributeValue;
- this.entity = entity;
- }
-
- public String getTypeName() {
- return typeName;
- }
-
- public String getAttribute() {
- return attribute;
- }
-
- public Referenceable getEntity() {
- return entity;
- }
-
- public String getAttributeValue() {
- return attributeValue;
- }
-
- @Override
- public String toString() {
- return "{"
- + "entityType='" + typeName + '\''
- + ", attribute=" + attribute
- + ", value=" + attributeValue
- + ", entity=" + entity
- + '}';
- }
- }
-
- /**
- * Hook message for creating new entities.
- */
- public static class EntityDeleteRequest extends HookNotificationMessage {
-
- private String typeName;
- private String attribute;
- private String attributeValue;
-
- private EntityDeleteRequest() {
- }
-
- public EntityDeleteRequest(String user, String typeName, String attribute, String attributeValue) {
- this(HookNotificationType.ENTITY_DELETE, user, typeName, attribute, attributeValue);
- }
-
- protected EntityDeleteRequest(HookNotificationType type,
- String user, String typeName, String attribute, String attributeValue) {
- super(type, user);
- this.typeName = typeName;
- this.attribute = attribute;
- this.attributeValue = attributeValue;
- }
-
- public String getTypeName() {
- return typeName;
- }
-
- public String getAttribute() {
- return attribute;
- }
-
- public String getAttributeValue() {
- return attributeValue;
- }
-
- @Override
- public String toString() {
- return "{"
- + "entityType='" + typeName + '\''
- + ", attribute=" + attribute
- + ", value=" + attributeValue
- + '}';
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index d59cb1c..9ce2a50 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -20,7 +20,7 @@ package org.apache.atlas.hook;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 071a725..f1fc741 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -22,10 +22,11 @@ import kafka.message.MessageAndMetadata;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.notification.*;
-import org.apache.atlas.notification.AtlasNotificationMessage;
-import org.apache.atlas.notification.entity.EntityNotificationImplTest;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.notification.entity.EntityNotificationTest;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -96,7 +97,7 @@ public class KafkaConsumerTest {
when(messageAndMetadata.message()).thenReturn(json);
- AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
+ AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L);
List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
assertTrue(messageList.size() > 0);
@@ -131,7 +132,7 @@ public class KafkaConsumerTest {
when(kafkaConsumer.poll(100L)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json);
- AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L);
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer ,false, 100L);
try {
List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
assertTrue(messageList.size() > 0);
@@ -151,7 +152,7 @@ public class KafkaConsumerTest {
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
- AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L);
consumer.commit(tp, 1);
@@ -163,7 +164,7 @@ public class KafkaConsumerTest {
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
- AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L);
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, true , 100L);
consumer.commit(tp, 1);
@@ -171,7 +172,7 @@ public class KafkaConsumerTest {
}
private Referenceable getEntity(String traitName) {
- Referenceable entity = EntityNotificationImplTest.getEntity("id");
+ Referenceable entity = EntityNotificationTest.getEntity("id");
List<Struct> traitInfo = new LinkedList<>();
Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
traitInfo.add(trait);
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 5e3cf41..fe019e1 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -22,13 +22,13 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index f313ddc..caa72ce 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -18,13 +18,15 @@
package org.apache.atlas.notification;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
-import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -41,8 +43,6 @@ import org.apache.kafka.common.TopicPartition;
*/
public class AbstractNotificationConsumerTest {
- private static final Gson GSON = new Gson();
-
@Test
public void testReceive() throws Exception {
Logger logger = mock(Logger.class);
@@ -54,27 +54,24 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
-
- Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
- NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+ NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
assertFalse(messageList.isEmpty());
- assertEquals(testMessage1, messageList.get(0).getMessage());
+ assertEquals(messageList.get(0).getMessage(), testMessage1);
- assertEquals(testMessage2, messageList.get(1).getMessage());
+ assertEquals(messageList.get(1).getMessage(), testMessage2);
- assertEquals(testMessage3, messageList.get(2).getMessage());
+ assertEquals(messageList.get(2).getMessage(), testMessage3);
- assertEquals(testMessage4, messageList.get(3).getMessage());
+ assertEquals(messageList.get(3).getMessage(), testMessage4);
}
@Test
@@ -88,20 +85,17 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
- String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
- String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
- String json4 = GSON.toJson(testMessage4);
+ String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
+ String json3 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
+ String json4 = AtlasType.toV1Json(testMessage4);
jsonList.add(json1);
jsonList.add(json2);
jsonList.add(json3);
jsonList.add(json4);
- Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
-
- NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+ NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
@@ -124,16 +118,13 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
- String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
+ String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
jsonList.add(json1);
jsonList.add(json2);
- Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
-
- NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+ NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
try {
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
@@ -150,7 +141,10 @@ public class AbstractNotificationConsumerTest {
private static class TestMessage {
private String s;
- private int i;
+ private int i;
+
+ public TestMessage() {
+ }
public TestMessage(String s, int i) {
this.s = s;
@@ -165,6 +159,14 @@ public class AbstractNotificationConsumerTest {
this.s = s;
}
+ public int getI() {
+ return i;
+ }
+
+ public void setI(int i) {
+ this.i = i;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -180,12 +182,14 @@ public class AbstractNotificationConsumerTest {
}
}
- private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
- private final List<T> messageList;
- private int index = 0;
+ private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> {
+ private final List<TestMessage> messageList;
+ private int index = 0;
+
+
+ public TestNotificationConsumer(List<TestMessage> messages, Logger logger) {
+ super(new TestMessageDeserializer());
- public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) {
- super(new TestDeserializer<T>(notificationMessageType, logger));
this.messageList = messages;
}
@@ -205,24 +209,35 @@ public class AbstractNotificationConsumerTest {
}
@Override
- public List<AtlasKafkaMessage<T>> receive() {
+ public List<AtlasKafkaMessage<TestMessage>> receive() {
return receive(1000L);
}
@Override
- public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
- List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
+ public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) {
+ List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList();
for(Object json : messageList) {
- tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
+ tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1));
}
return tempMessageList;
}
}
- private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
+ public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> {
+ /**
+ * Logger for hook notification messages.
+ */
+ private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(TestMessageDeserializer.class);
+
+
+ // ----- Constructors ----------------------------------------------------
- private TestDeserializer(Type notificationMessageType, Logger logger) {
- super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
+ /**
+ * Create a hook notification message deserializer.
+ */
+ public TestMessageDeserializer() {
+ super(new TypeReference<TestMessage>() {}, new TypeReference<AtlasNotificationMessage<TestMessage>>() {},
+ AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
}
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 655252c..98d7d2c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -19,7 +19,8 @@
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.commons.configuration.Configuration;
import org.testng.annotations.Test;
@@ -98,8 +99,8 @@ public class AbstractNotificationTest {
// ignore msgCreationTime in Json
private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) {
- Map<Object, Object> msgActual = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class);
- Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class);
+ Map<Object, Object> msgActual = AtlasType.fromV1Json(msgJsonActual, Map.class);
+ Map<Object, Object> msgExpected = AtlasType.fromV1Json(msgJsonExpected, Map.class);
msgActual.remove("msgCreationTime");
msgExpected.remove("msgCreationTime");
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
index 27b5034..91a195d 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
@@ -18,6 +18,8 @@
package org.apache.atlas.notification;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.MessageVersion;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
index d1af4b0..d8b3b34 100644
--- a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
@@ -18,6 +18,7 @@
package org.apache.atlas.notification;
+import org.apache.atlas.model.notification.MessageVersion;
import org.testng.annotations.Test;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
index 0807221..b79735a 100644
--- a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
@@ -17,7 +17,8 @@
*/
package org.apache.atlas.notification;
-import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
index faafb87..ddb63b5 100644
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
@@ -21,6 +21,7 @@ package org.apache.atlas.notification.entity;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.v1.model.notification.EntityNotification;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -34,19 +35,18 @@ import static org.testng.Assert.assertEquals;
* EntityMessageDeserializer tests.
*/
public class EntityMessageDeserializerTest {
+ private EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
@Test
public void testDeserialize() throws Exception {
- EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
-
- Referenceable entity = EntityNotificationImplTest.getEntity("id");
+ Referenceable entity = EntityNotificationTest.getEntity("id");
String traitName = "MyTrait";
List<Struct> traitInfo = new LinkedList<>();
Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
traitInfo.add(trait);
- EntityNotificationImpl notification =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
+ EntityNotification notification =
+ new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
List<String> jsonMsgList = new ArrayList<>();
@@ -55,7 +55,7 @@ public class EntityMessageDeserializerTest {
EntityNotification deserializedNotification = null;
for (String jsonMsg : jsonMsgList) {
- deserializedNotification = deserializer.deserialize(jsonMsg);
+ deserializedNotification = deserializer.deserialize(jsonMsg);
if (deserializedNotification != null) {
break;