You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/04 08:08:18 UTC
[2/2] incubator-atlas git commit: ATLAS-631 Introduce Versioning to
Atlas Notification Payload (tbeerbower via shwethags)
ATLAS-631 Introduce Versioning to Atlas Notification Payload (tbeerbower via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b2ae1371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b2ae1371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b2ae1371
Branch: refs/heads/master
Commit: b2ae1371be24cfcb13f12dcb4ebad6920b9bfd80
Parents: 73640cc
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed May 4 11:38:06 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed May 4 11:38:06 2016 +0530
----------------------------------------------------------------------
.../org/apache/atlas/kafka/KafkaConsumer.java | 16 +-
.../apache/atlas/kafka/KafkaNotification.java | 51 ++--
.../AbstractMessageDeserializer.java | 165 ++++++++++++
.../notification/AbstractNotification.java | 107 +++++++-
.../AbstractNotificationConsumer.java | 154 +----------
.../IncompatibleVersionException.java | 32 +++
.../atlas/notification/MessageDeserializer.java | 33 +++
.../atlas/notification/MessageVersion.java | 133 ++++++++++
.../notification/NotificationInterface.java | 44 +++-
.../atlas/notification/VersionedMessage.java | 75 ++++++
.../VersionedMessageDeserializer.java | 107 ++++++++
.../entity/EntityMessageDeserializer.java | 76 ++++++
.../hook/HookMessageDeserializer.java | 60 +++++
.../apache/atlas/kafka/KafkaConsumerTest.java | 176 +++++++++++++
.../atlas/kafka/KafkaNotificationTest.java | 7 +-
.../AbstractNotificationConsumerTest.java | 264 +++++++++++++++++++
.../notification/AbstractNotificationTest.java | 120 +++++++++
.../atlas/notification/MessageVersionTest.java | 125 +++++++++
.../notification/VersionedMessageTest.java | 57 ++++
.../entity/EntityMessageDeserializerTest.java | 61 +++++
.../entity/EntityNotificationImplTest.java | 2 +-
.../hook/HookMessageDeserializerTest.java | 70 +++++
.../notification/hook/HookNotificationTest.java | 20 +-
release-log.txt | 1 +
24 files changed, 1758 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index 029a072..f1c9742 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -21,6 +21,7 @@ import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotificationConsumer;
+import org.apache.atlas.notification.MessageDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,13 +42,16 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
/**
* Create a Kafka consumer.
*
- * @param type the notification type returned by this consumer
- * @param stream the underlying Kafka stream
- * @param consumerId an id value for this consumer
+ * @param type the notification type returned by this consumer
+ * @param deserializer the message deserializer used for this consumer
+ * @param stream the underlying Kafka stream
+ * @param consumerId an id value for this consumer
*/
- public KafkaConsumer(Class<T> type, KafkaStream<String, String> stream, int consumerId) {
- super(type);
- this.iterator = stream.iterator();
+ public KafkaConsumer(Class<T> type,
+ MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId) {
+ super(deserializer);
+
+ this.iterator = stream.iterator();
this.consumerId = consumerId;
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 889af11..cfffec4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -28,6 +28,7 @@ import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.service.Service;
@@ -172,7 +173,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
int consumerId = 0;
for (KafkaStream stream : kafkaConsumers) {
- consumers.add(createKafkaConsumer(notificationType.getClassType(), stream, consumerId++));
+ KafkaConsumer<T> kafkaConsumer =
+ createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
+ stream, consumerId++);
+ consumers.add(kafkaConsumer);
}
consumerConnectors.add(consumerConnector);
@@ -180,6 +184,22 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
@Override
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ producer = null;
+ }
+
+ for (ConsumerConnector consumerConnector : consumerConnectors) {
+ consumerConnector.shutdown();
+ }
+ consumerConnectors.clear();
+ }
+
+
+ // ----- AbstractNotification --------------------------------------------
+
+ @Override
public void sendInternal(NotificationType type, String... messages) throws NotificationException {
if (producer == null) {
createProducer();
@@ -197,27 +217,13 @@ public class KafkaNotification extends AbstractNotification implements Service {
try {
RecordMetadata response = future.get();
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
- response.partition(), response.offset());
+ response.partition(), response.offset());
} catch (Exception e) {
throw new NotificationException(e);
}
}
}
- @Override
- public void close() {
- if (producer != null) {
- producer.close();
- producer = null;
- }
-
- for (ConsumerConnector consumerConnector : consumerConnectors) {
- consumerConnector.shutdown();
- }
- consumerConnectors.clear();
- }
-
-
// ----- helper methods --------------------------------------------------
/**
@@ -234,14 +240,17 @@ public class KafkaNotification extends AbstractNotification implements Service {
/**
* Create a Kafka consumer from the given Kafka stream.
*
- * @param stream the Kafka stream
- * @param consumerId the id for the new consumer
+ * @param type the notification type to be returned by the consumer
+ * @param deserializer the deserializer for the created consumers
+ * @param stream the Kafka stream
+ * @param consumerId the id for the new consumer
*
* @return a new Kafka consumer
*/
- protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
- int consumerId) {
- return new org.apache.atlas.kafka.KafkaConsumer<T>(type, stream, consumerId);
+ protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
+ MessageDeserializer<T> deserializer, KafkaStream stream,
+ int consumerId) {
+ return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, stream, consumerId);
}
// Get properties for consumer request
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
new file mode 100644
index 0000000..9585827
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base notification message deserializer.
+ */
+public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> {
+
+ private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>();
+
+ static {
+ DESERIALIZER_MAP.put(ImmutableList.class, new ImmutableListDeserializer());
+ DESERIALIZER_MAP.put(ImmutableMap.class, new ImmutableMapDeserializer());
+ DESERIALIZER_MAP.put(JSONArray.class, new JSONArrayDeserializer());
+ DESERIALIZER_MAP.put(IStruct.class, new StructDeserializer());
+ DESERIALIZER_MAP.put(IReferenceableInstance.class, new ReferenceableDeserializer());
+ DESERIALIZER_MAP.put(Referenceable.class, new ReferenceableDeserializer());
+ }
+
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create a deserializer.
+ *
+ * @param versionedMessageType the type of the versioned message
+ * @param expectedVersion the expected message version
+ * @param deserializerMap map of individual deserializers used to define this message deserializer
+ * @param notificationLogger logger for message version mismatch
+ */
+ public AbstractMessageDeserializer(Type versionedMessageType,
+ MessageVersion expectedVersion,
+ Map<Type, JsonDeserializer> deserializerMap,
+ Logger notificationLogger) {
+ super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
+ }
+
+
+ // ----- helper methods --------------------------------------------------
+
+ private static Gson getDeserializer(Map<Type, JsonDeserializer> deserializerMap) {
+ GsonBuilder builder = new GsonBuilder();
+
+ for (Map.Entry<Type, JsonDeserializer> entry : DESERIALIZER_MAP.entrySet()) {
+ builder.registerTypeAdapter(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<Type, JsonDeserializer> entry : deserializerMap.entrySet()) {
+ builder.registerTypeAdapter(entry.getKey(), entry.getValue());
+ }
+ return builder.create();
+ }
+
+
+ // ----- deserializer classes --------------------------------------------
+
+ /**
+ * Deserializer for ImmutableList.
+ */
+ protected static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
+ public static final Type LIST_TYPE = new TypeToken<List<?>>() {
+ }.getType();
+
+ @Override
+ public ImmutableList<?> deserialize(JsonElement json, Type type,
+ JsonDeserializationContext context) {
+ final List<?> list = context.deserialize(json, LIST_TYPE);
+ return ImmutableList.copyOf(list);
+ }
+ }
+
+ /**
+ * Deserializer for ImmutableMap.
+ */
+ protected static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
+
+ public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
+ }.getType();
+
+ @Override
+ public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
+ JsonDeserializationContext context) {
+ final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
+ return ImmutableMap.copyOf(map);
+ }
+ }
+
+ /**
+ * Deserializer for JSONArray.
+ */
+ protected static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
+ @Override
+ public JSONArray deserialize(final JsonElement json, final Type type,
+ final JsonDeserializationContext context) {
+ try {
+ return new JSONArray(json.toString());
+ } catch (JSONException e) {
+ throw new JsonParseException(e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * Deserializer for Struct.
+ */
+ protected static final class StructDeserializer implements JsonDeserializer<IStruct> {
+ @Override
+ public IStruct deserialize(final JsonElement json, final Type type,
+ final JsonDeserializationContext context) {
+ return context.deserialize(json, Struct.class);
+ }
+ }
+
+ /**
+ * Deserializer for Referenceable.
+ */
+ protected static final class ReferenceableDeserializer implements JsonDeserializer<IReferenceableInstance> {
+ @Override
+ public IReferenceableInstance deserialize(final JsonElement json, final Type type,
+ final JsonDeserializationContext context) {
+
+ return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 596f988..7d22126 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -17,10 +17,22 @@
*/
package org.apache.atlas.notification;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
+import org.codehaus.jettison.json.JSONArray;
+
+import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
@@ -29,12 +41,26 @@ import java.util.List;
*/
public abstract class AbstractNotification implements NotificationInterface {
+ /**
+ * The current expected version for notification messages.
+ */
+ public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
+
private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private final boolean embedded;
private final boolean isHAEnabled;
+ /**
+ * Used for message serialization.
+ */
+ public static final Gson GSON = new GsonBuilder().
+ registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializer()).
+ registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()).
+ registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
+ create();
+
- // ----- Constructors ------------------------------------------------------
+ // ----- Constructors ----------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
@@ -42,7 +68,23 @@ public abstract class AbstractNotification implements NotificationInterface {
}
- // ----- AbstractNotificationInterface -------------------------------------
+ // ----- NotificationInterface -------------------------------------------
+
+ @Override
+ public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+ String[] strMessages = new String[messages.size()];
+ for (int index = 0; index < messages.size(); index++) {
+ strMessages[index] = getMessageJson(messages.get(index));
+ }
+ sendInternal(type, strMessages);
+ }
+
+ @Override
+ public <T> void send(NotificationType type, T... messages) throws NotificationException {
+ send(type, Arrays.asList(messages));
+ }
+
+ // ----- AbstractNotification --------------------------------------------
/**
* Determine whether or not the notification service embedded in Atlas server.
@@ -53,23 +95,62 @@ public abstract class AbstractNotification implements NotificationInterface {
return embedded;
}
+ /**
+ * Determine whether or not the high availability feature is enabled.
+ *
+ * @return true if the high availability feature is enabled.
+ */
protected final boolean isHAEnabled() {
return isHAEnabled;
}
- @Override
- public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
- String[] strMessages = new String[messages.size()];
- for (int index = 0; index < messages.size(); index++) {
- strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index));
- }
- sendInternal(type, strMessages);
+ /**
+ * Send the given messages.
+ *
+ * @param type the message type
+ * @param messages the array of messages to send
+ *
+ * @throws NotificationException if an error occurs while sending
+ */
+ protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
+
+
+ // ----- utility methods -------------------------------------------------
+
+ /**
+ * Get the notification message JSON from the given object.
+ *
+ * @param message the message in object form
+ *
+ * @return the message as a JSON string
+ */
+ public static String getMessageJson(Object message) {
+ VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message);
+
+ return GSON.toJson(versionedMessage);
}
- @Override
- public <T> void send(NotificationType type, T... messages) throws NotificationException {
- send(type, Arrays.asList(messages));
+
+ // ----- serializers -----------------------------------------------------
+
+ /**
+ * Serializer for Referenceable.
+ */
+ public static final class ReferenceableSerializer implements JsonSerializer<IReferenceableInstance> {
+ @Override
+ public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
+ String instanceJson = InstanceSerialization.toJson(src, true);
+ return new JsonParser().parse(instanceJson).getAsJsonObject();
+ }
}
- protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
+ /**
+ * Serializer for JSONArray.
+ */
+ public static final class JSONArraySerializer implements JsonSerializer<JSONArray> {
+ @Override
+ public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
+ return new JsonParser().parse(src.toString()).getAsJsonArray();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 1cadb99..f00bbca 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -17,50 +17,15 @@
*/
package org.apache.atlas.notification;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import com.google.gson.reflect.TypeToken;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.entity.EntityNotificationImpl;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.Map;
-
/**
* Abstract notification consumer.
*/
public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
- public static final Gson GSON = new GsonBuilder().
- registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
- registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
- registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
- registerTypeAdapter(IStruct.class, new StructDeserializer()).
- registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
- registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
- registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()).
- registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()).
- create();
-
- private final Class<T> type;
+ /**
+ * Deserializer used to deserialize notification messages for this consumer.
+ */
+ private final MessageDeserializer<T> deserializer;
// ----- Constructors ----------------------------------------------------
@@ -68,10 +33,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
/**
* Construct an AbstractNotificationConsumer.
*
- * @param type the notification type
+ * @param deserializer the message deserializer used by this consumer
*/
- public AbstractNotificationConsumer(Class<T> type) {
- this.type = type;
+ public AbstractNotificationConsumer(MessageDeserializer<T> deserializer) {
+ this.deserializer = deserializer;
}
@@ -96,112 +61,11 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
@Override
public T next() {
- return GSON.fromJson(getNext(), type);
+ return deserializer.deserialize(getNext());
}
@Override
public T peek() {
- return GSON.fromJson(peekMessage(), type);
- }
-
-
- /**
- * Deserializer for ImmutableList used by AbstractNotificationConsumer.GSON.
- */
- public static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
- public static final Type LIST_TYPE = new TypeToken<List<?>>() {
- }.getType();
-
- @Override
- public ImmutableList<?> deserialize(JsonElement json, Type type,
- JsonDeserializationContext context) {
- final List<?> list = context.deserialize(json, LIST_TYPE);
- return ImmutableList.copyOf(list);
- }
- }
-
- /**
- * Deserializer for ImmutableMap used by AbstractNotificationConsumer.GSON.
- */
- public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
-
- public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
- }.getType();
-
- @Override
- public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
- JsonDeserializationContext context) {
- final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
- return ImmutableMap.copyOf(map);
- }
- }
-
-
- /**
- * Deserializer for EntityNotification used by AbstractNotificationConsumer.GSON.
- */
- public static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
- @Override
- public EntityNotification deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
- return context.deserialize(json, EntityNotificationImpl.class);
- }
- }
-
- /**
- * Serde for Struct used by AbstractNotificationConsumer.GSON.
- */
- public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
- @Override
- public IStruct deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
- return context.deserialize(json, Struct.class);
- }
-
- @Override
- public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
- String instanceJson = InstanceSerialization.toJson(src, true);
- return new JsonParser().parse(instanceJson).getAsJsonObject();
- }
- }
-
- /**
- * Serde for Referenceable used by AbstractNotificationConsumer.GSON.
- */
- public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
- JsonSerializer<IReferenceableInstance> {
- @Override
- public IReferenceableInstance deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
-
- return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
- }
-
- @Override
- public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
- String instanceJson = InstanceSerialization.toJson(src, true);
- return new JsonParser().parse(instanceJson).getAsJsonObject();
- }
- }
-
- /**
- * Serde for JSONArray used by AbstractNotificationConsumer.GSON.
- */
- public static final class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>,
- JsonSerializer<JSONArray> {
- @Override
- public JSONArray deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
- try {
- return new JSONArray(json.toString());
- } catch (JSONException e) {
- throw new JsonParseException(e.getMessage(), e);
- }
- }
-
- @Override
- public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
- return new JsonParser().parse(src.toString()).getAsJsonArray();
- }
+ return deserializer.deserialize(peekMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java b/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java
new file mode 100644
index 0000000..6a59014
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Exception thrown when notification message is consumed that has a version that is incompatable with
+ * the expected version.
+ */
+public class IncompatibleVersionException extends RuntimeException {
+
+ // ----- Constructors ----------------------------------------------------
+
+ public IncompatibleVersionException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java
new file mode 100644
index 0000000..7778908
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Deserializer for JSON messages.
+ */
+public interface MessageDeserializer<T> {
+ /**
+ * Get a message of type T from the given JSON message string.
+ *
+ * @param json the JSON message
+ *
+ * @return the message deserialized from the given JSON
+ */
+ T deserialize(String json);
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
new file mode 100644
index 0000000..3f16a9a
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+/**
+ * Represents the version of a notification message.
+ */
+public class MessageVersion implements Comparable<MessageVersion> {
+ /**
+ * Used for message with no version (old format).
+ */
+ public static final MessageVersion NO_VERSION = new MessageVersion("0");
+
+ private final String version;
+
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create a message version.
+ *
+ * @param version the version string
+ */
+ public MessageVersion(String version) {
+ this.version = version;
+
+ try {
+ getVersionParts();
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e);
+ }
+ }
+
+
+ // ----- Comparable ------------------------------------------------------
+
+ @Override
+ public int compareTo(MessageVersion that) {
+ if (that == null) {
+ return 1;
+ }
+
+ Integer[] thisParts = getVersionParts();
+ Integer[] thatParts = that.getVersionParts();
+
+ int length = Math.max(thisParts.length, thatParts.length);
+
+ for (int i = 0; i < length; i++) {
+
+ int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i);
+
+ if (comp != 0) {
+ return comp;
+ }
+ }
+ return 0;
+ }
+
+
+ // ----- Object overrides ------------------------------------------------
+
+ @Override
+ public boolean equals(Object that) {
+ if (this == that){
+ return true;
+ }
+
+ if (that == null || getClass() != that.getClass()) {
+ return false;
+ }
+
+ return compareTo((MessageVersion) that) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(getVersionParts());
+ }
+
+
+ // ----- helper methods --------------------------------------------------
+
+ /**
+ * Get the version parts array by splitting the version string.
+ * Strip the trailing zeros (i.e. '1.0.0' equals '1').
+ *
+ * @return the version parts array
+ */
+ protected Integer[] getVersionParts() {
+
+ String[] sParts = version.split("\\.");
+ ArrayList<Integer> iParts = new ArrayList<>();
+ int trailingZeros = 0;
+
+ for (String sPart : sParts) {
+ Integer iPart = new Integer(sPart);
+
+ if (iPart == 0) {
+ ++trailingZeros;
+ } else {
+ for (int i = 0; i < trailingZeros; ++i) {
+ iParts.add(0);
+ }
+ trailingZeros = 0;
+ iParts.add(iPart);
+ }
+ }
+ return iParts.toArray(new Integer[iParts.size()]);
+ }
+
+ private Integer getVersionPart(Integer[] versionParts, int i) {
+ return i < versionParts.length ? versionParts[i] : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index ac285aa..384f383 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,9 +17,13 @@
*/
package org.apache.atlas.notification;
+import org.apache.atlas.notification.entity.EntityMessageDeserializer;
import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.notification.hook.HookMessageDeserializer;
import org.apache.atlas.notification.hook.HookNotification;
+import com.google.gson.reflect.TypeToken;
+import java.lang.reflect.Type;
import java.util.List;
/**
@@ -37,25 +41,59 @@ public interface NotificationInterface {
String PROPERTY_PREFIX = "atlas.notification";
/**
+ * Notification message class types.
+ */
+ Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS =
+ HookNotification.HookNotificationMessage.class;
+
+ Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = EntityNotification.class;
+
+ /**
+ * Versioned notification message class types.
+ */
+ Type HOOK_VERSIONED_MESSAGE_TYPE =
+ new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType();
+
+ Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType();
+
+ /**
* Atlas notification types.
*/
enum NotificationType {
- HOOK(HookNotification.HookNotificationMessage.class), // notifications from the Atlas integration hook producers
- ENTITIES(EntityNotification.class); // notifications to entity change consumers
+ // Notifications from the Atlas integration hooks.
+ HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()),
+
+ // Notifications to entity change consumers.
+ ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer());
+
/**
* The notification class associated with this type.
*/
private final Class classType;
- NotificationType(Class classType) {
+ /**
+ * The message deserializer for this type.
+ */
+ private final MessageDeserializer deserializer;
+
+
+ NotificationType(Class classType, MessageDeserializer<?> deserializer) {
this.classType = classType;
+ this.deserializer = deserializer;
}
+
+ // ----- accessors ---------------------------------------------------
+
public Class getClassType() {
return classType;
}
+
+ public MessageDeserializer getDeserializer() {
+ return deserializer;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
new file mode 100644
index 0000000..1929eb4
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Represents a notification message that is associated with a version.
+ */
+public class VersionedMessage<T> {
+
+ /**
+ * The version of the message.
+ */
+ private final MessageVersion version;
+
+ /**
+ * The actual message.
+ */
+ private final T message;
+
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create a versioned message.
+ *
+ * @param version the message version
+ * @param message the actual message
+ */
+ public VersionedMessage(MessageVersion version, T message) {
+ this.version = version;
+ this.message = message;
+ }
+
+
+ // ----- VersionedMessage ------------------------------------------------
+
+ /**
+ * Compare the version of this message with the given version.
+ *
+ * @param compareToVersion the version to compare to
+ *
+ * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
+ * or greater than the given version.
+ */
+ public int compareVersion(MessageVersion compareToVersion) {
+ return version.compareTo(compareToVersion);
+ }
+
+
+ // ----- accessors -------------------------------------------------------
+
+ public MessageVersion getVersion() {
+ return version;
+ }
+
+ public T getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
new file mode 100644
index 0000000..290be59
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+/**
+ * Deserializer that works with versioned messages. The version of each deserialized message is checked against an
+ * expected version.
+ */
+public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
+
+ public static final String VERSION_MISMATCH_MSG =
+ "Notification message version mismatch. Expected %s but recieved %s";
+
+ private final Type versionedMessageType;
+ private final MessageVersion expectedVersion;
+ private final Logger notificationLogger;
+ private final Gson gson;
+
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create a versioned message deserializer.
+ *
+ * @param versionedMessageType the type of the versioned message
+ * @param expectedVersion the expected message version
+ * @param gson JSON serialization/deserialization
+ * @param notificationLogger logger for message version mismatch
+ */
+ public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion,
+ Gson gson, Logger notificationLogger) {
+ this.versionedMessageType = versionedMessageType;
+ this.expectedVersion = expectedVersion;
+ this.gson = gson;
+ this.notificationLogger = notificationLogger;
+ }
+
+
+ // ----- MessageDeserializer ---------------------------------------------
+
+ @Override
+ public T deserialize(String messageJson) {
+ VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType);
+
+ // older style messages not wrapped with VersionedMessage
+ if (versionedMessage.getVersion() == null) {
+ Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0];
+ versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t));
+ }
+ checkVersion(versionedMessage, messageJson);
+
+ return versionedMessage.getMessage();
+ }
+
+
+ // ----- helper methods --------------------------------------------------
+
+ /**
+ * Check the message version against the expected version.
+ *
+ * @param versionedMessage the versioned message
+ * @param messageJson the notification message json
+ *
+ * @throws IncompatibleVersionException if the message version is incompatable with the expected version
+ */
+ protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) {
+ int comp = versionedMessage.compareVersion(expectedVersion);
+
+ // message has newer version
+ if (comp > 0) {
+ String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion());
+ notificationLogger.error(msg);
+ notificationLogger.info(messageJson);
+ throw new IncompatibleVersionException(msg);
+ }
+
+ // message has older version
+ if (comp < 0) {
+ notificationLogger.info(
+ String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion()));
+
+ notificationLogger.info(messageJson);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
new file mode 100644
index 0000000..a6f7e64
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import org.apache.atlas.notification.AbstractMessageDeserializer;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Entity notification message deserializer.
+ */
+public class EntityMessageDeserializer extends AbstractMessageDeserializer<EntityNotification> {
+
+ /**
+ * Logger for entity notification messages.
+ */
+ private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(EntityMessageDeserializer.class);
+
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create an entity notification message deserializer.
+ */
+ public EntityMessageDeserializer() {
+ super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE,
+ AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
+ }
+
+
+ // ----- helper methods --------------------------------------------------
+
+ private static Map<Type, JsonDeserializer> getDeserializerMap() {
+ return Collections.<Type, JsonDeserializer>singletonMap(
+ NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer());
+ }
+
+
+ // ----- deserializer classes --------------------------------------------
+
+ /**
+ * Deserializer for EntityNotification.
+ */
+ protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
+ @Override
+ public EntityNotification deserialize(final JsonElement json, final Type type,
+ final JsonDeserializationContext context) {
+ return context.deserialize(json, EntityNotificationImpl.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
new file mode 100644
index 0000000..8337de0
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import com.google.gson.JsonDeserializer;
+import org.apache.atlas.notification.AbstractMessageDeserializer;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Hook notification message deserializer.
+ */
+public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification.HookNotificationMessage> {
+
+ /**
+ * Logger for hook notification messages.
+ */
+ private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(HookMessageDeserializer.class);
+
+
+ // ----- Constructors ----------------------------------------------------
+
+ /**
+ * Create a hook notification message deserializer.
+ */
+ public HookMessageDeserializer() {
+ super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE,
+ AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
+ }
+
+
+ // ----- helper methods --------------------------------------------------
+
+ private static Map<Type, JsonDeserializer> getDeserializerMap() {
+ return Collections.<Type, JsonDeserializer>singletonMap(
+ NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
new file mode 100644
index 0000000..7f607c6
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.MessageVersion;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.IncompatibleVersionException;
+import org.apache.atlas.notification.VersionedMessage;
+import org.apache.atlas.notification.entity.EntityNotificationImplTest;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.Struct;
+import org.codehaus.jettison.json.JSONException;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+/**
+ * KafkaConsumer tests.
+ */
+public class KafkaConsumerTest {
+
+ private static final String TRAIT_NAME = "MyTrait";
+
+ @Test
+ public void testNext() throws Exception {
+ KafkaStream<String, String> stream = mock(KafkaStream.class);
+ ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+ MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
+
+ Referenceable entity = getEntity(TRAIT_NAME);
+
+ HookNotification.EntityUpdateRequest message =
+ new HookNotification.EntityUpdateRequest("user1", entity);
+
+ String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
+
+ when(stream.iterator()).thenReturn(iterator);
+ when(iterator.hasNext()).thenReturn(true).thenReturn(false);
+ when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
+ when(messageAndMetadata.message()).thenReturn(json);
+
+ NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+ new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
+ NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+
+ assertTrue(consumer.hasNext());
+
+ HookNotification.HookNotificationMessage consumedMessage = consumer.next();
+
+ assertMessagesEqual(message, consumedMessage, entity);
+
+ assertFalse(consumer.hasNext());
+ }
+
+ @Test
+ public void testNextVersionMismatch() throws Exception {
+ KafkaStream<String, String> stream = mock(KafkaStream.class);
+ ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+ MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
+
+ Referenceable entity = getEntity(TRAIT_NAME);
+
+ HookNotification.EntityUpdateRequest message =
+ new HookNotification.EntityUpdateRequest("user1", entity);
+
+ String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
+
+ when(stream.iterator()).thenReturn(iterator);
+ when(iterator.hasNext()).thenReturn(true).thenReturn(false);
+ when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
+ when(messageAndMetadata.message()).thenReturn(json);
+
+ NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+ new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
+ NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+
+ assertTrue(consumer.hasNext());
+
+ try {
+ consumer.next();
+ fail("Expected VersionMismatchException!");
+ } catch (IncompatibleVersionException e) {
+ e.printStackTrace();
+ }
+
+ assertFalse(consumer.hasNext());
+ }
+
+ @Test
+ public void testPeekMessage() throws Exception {
+ KafkaStream<String, String> stream = mock(KafkaStream.class);
+ ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+ MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
+
+ Referenceable entity = getEntity(TRAIT_NAME);
+
+ HookNotification.EntityUpdateRequest message =
+ new HookNotification.EntityUpdateRequest("user1", entity);
+
+ String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
+
+ when(stream.iterator()).thenReturn(iterator);
+ when(iterator.hasNext()).thenReturn(true);
+ when(iterator.peek()).thenReturn(messageAndMetadata);
+ when(messageAndMetadata.message()).thenReturn(json);
+
+ NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+ new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
+ NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+
+ assertTrue(consumer.hasNext());
+
+ HookNotification.HookNotificationMessage consumedMessage = consumer.peek();
+
+ assertMessagesEqual(message, consumedMessage, entity);
+
+ assertTrue(consumer.hasNext());
+ }
+
+ private Referenceable getEntity(String traitName) {
+ Referenceable entity = EntityNotificationImplTest.getEntity("id");
+ List<IStruct> traitInfo = new LinkedList<>();
+ IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
+ traitInfo.add(trait);
+ return entity;
+ }
+
+ private void assertMessagesEqual(HookNotification.EntityUpdateRequest message,
+ HookNotification.HookNotificationMessage consumedMessage,
+ Referenceable entity) throws JSONException {
+
+ assertEquals(consumedMessage.getType(), message.getType());
+ assertEquals(consumedMessage.getUser(), message.getUser());
+
+ assertTrue(consumedMessage instanceof HookNotification.EntityUpdateRequest);
+
+ HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
+ (HookNotification.EntityUpdateRequest) consumedMessage;
+
+ Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
+ assertEquals(deserializedEntity.getId(), entity.getId());
+ assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
+ assertEquals(deserializedEntity.getTraits(), entity.getTraits());
+ assertEquals(deserializedEntity.getTrait(TRAIT_NAME), entity.getTrait(TRAIT_NAME));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index db34815..17fda25 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -22,6 +22,7 @@ import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
@@ -130,10 +131,12 @@ public class KafkaNotificationTest {
}
@Override
- protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
+ protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
+ MessageDeserializer<T> deserializer,
+ KafkaStream stream,
int consumerId) {
kafkaStreams.add(stream);
- return super.createKafkaConsumer(type, stream, consumerId);
+ return super.createKafkaConsumer(type, deserializer, stream, consumerId);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
new file mode 100644
index 0000000..e63175d
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.slf4j.Logger;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.*;
+
+/**
+ * AbstractNotificationConsumer tests.
+ */
+public class AbstractNotificationConsumerTest {
+
+ private static final Gson GSON = new Gson();
+
+ @Test
+ public void testNext() throws Exception {
+ Logger logger = mock(Logger.class);
+
+ TestMessage testMessage1 = new TestMessage("sValue1", 99);
+ TestMessage testMessage2 = new TestMessage("sValue2", 98);
+ TestMessage testMessage3 = new TestMessage("sValue3", 97);
+ TestMessage testMessage4 = new TestMessage("sValue4", 96);
+
+ List<String> jsonList = new LinkedList<>();
+
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
+
+ Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+ NotificationConsumer<TestMessage> consumer =
+ new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(testMessage1, consumer.next());
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(testMessage2, consumer.next());
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(testMessage3, consumer.next());
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(testMessage4, consumer.next());
+
+ assertFalse(consumer.hasNext());
+ }
+
+ @Test
+ public void testNextBackVersion() throws Exception {
+ Logger logger = mock(Logger.class);
+
+ TestMessage testMessage1 = new TestMessage("sValue1", 99);
+ TestMessage testMessage2 = new TestMessage("sValue2", 98);
+ TestMessage testMessage3 = new TestMessage("sValue3", 97);
+ TestMessage testMessage4 = new TestMessage("sValue4", 96);
+
+ List<String> jsonList = new LinkedList<>();
+
+ String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
+ String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3));
+ String json4 = GSON.toJson(testMessage4);
+
+ jsonList.add(json1);
+ jsonList.add(json2);
+ jsonList.add(json3);
+ jsonList.add(json4);
+
+ Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+ NotificationConsumer<TestMessage> consumer =
+ new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+ assertTrue(consumer.hasNext());
+
+ assertEquals(new TestMessage("sValue1", 99), consumer.next());
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(new TestMessage("sValue2", 98), consumer.next());
+ verify(logger).info(json2);
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(new TestMessage("sValue3", 97), consumer.next());
+ verify(logger).info(json3);
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(new TestMessage("sValue4", 96), consumer.next());
+ verify(logger).info(json4);
+
+ assertFalse(consumer.hasNext());
+ }
+
+ @Test
+ public void testNextForwardVersion() throws Exception {
+ Logger logger = mock(Logger.class);
+
+ TestMessage testMessage1 = new TestMessage("sValue1", 99);
+ TestMessage testMessage2 = new TestMessage("sValue2", 98);
+
+ List<String> jsonList = new LinkedList<>();
+
+ String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
+
+ jsonList.add(json1);
+ jsonList.add(json2);
+
+ Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+ NotificationConsumer<TestMessage> consumer =
+ new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+ assertTrue(consumer.hasNext());
+
+ assertEquals(testMessage1, consumer.next());
+
+ assertTrue(consumer.hasNext());
+
+ try {
+ consumer.next();
+ fail("Expected VersionMismatchException!");
+ } catch (IncompatibleVersionException e) {
+ verify(logger).info(json2);
+ }
+
+ assertFalse(consumer.hasNext());
+ }
+
+ @Test
+ public void testPeek() throws Exception {
+ Logger logger = mock(Logger.class);
+
+ TestMessage testMessage1 = new TestMessage("sValue1", 99);
+ TestMessage testMessage2 = new TestMessage("sValue2", 98);
+ TestMessage testMessage3 = new TestMessage("sValue3", 97);
+ TestMessage testMessage4 = new TestMessage("sValue4", 96);
+
+ List<String> jsonList = new LinkedList<>();
+
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+ jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
+
+ Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+ NotificationConsumer<TestMessage> consumer =
+ new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+ assertTrue(consumer.hasNext());
+
+ assertEquals(testMessage1, consumer.peek());
+
+ assertTrue(consumer.hasNext());
+
+ assertEquals(testMessage1, consumer.peek());
+
+ assertTrue(consumer.hasNext());
+ }
+
+ private static class TestMessage {
+ private String s;
+ private int i;
+
+ public TestMessage(String s, int i) {
+ this.s = s;
+ this.i = i;
+ }
+
+ public String getS() {
+ return s;
+ }
+
+ public void setS(String s) {
+ this.s = s;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TestMessage that = (TestMessage) o;
+
+ return i == that.i && (s != null ? s.equals(that.s) : that.s == null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = s != null ? s.hashCode() : 0;
+ result = 31 * result + i;
+ return result;
+ }
+ }
+
+ private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
+ private final List<String> messageList;
+ private int index = 0;
+
+ public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) {
+ super(new TestDeserializer<T>(versionedMessageType, logger));
+ this.messageList = messages;
+ }
+
+ @Override
+ protected String getNext() {
+ return messageList.get(index++);
+ }
+
+ @Override
+ protected String peekMessage() {
+ return messageList.get(index);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < messageList.size();
+ }
+ }
+
+ private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
+
+ private TestDeserializer(Type versionedMessageType, Logger logger) {
+ super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
new file mode 100644
index 0000000..61107a9
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.commons.configuration.Configuration;
+import org.testng.annotations.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.*;
+
+/**
+ * AbstractNotification tests.
+ */
+public class AbstractNotificationTest {
+
+ @Test
+ public void testSend() throws Exception {
+ Configuration configuration = mock(Configuration.class);
+
+ TestNotification notification = new TestNotification(configuration);
+
+ TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
+ TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
+ TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+
+ String messageJson1 = AbstractNotification.getMessageJson(message1);
+ String messageJson2 = AbstractNotification.getMessageJson(message2);
+ String messageJson3 = AbstractNotification.getMessageJson(message3);
+
+ notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
+
+ assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
+ assertEquals(3, notification.messages.length);
+ assertEquals(messageJson1, notification.messages[0]);
+ assertEquals(messageJson2, notification.messages[1]);
+ assertEquals(messageJson3, notification.messages[2]);
+ }
+
+ @Test
+ public void testSend2() throws Exception {
+ Configuration configuration = mock(Configuration.class);
+
+ TestNotification notification = new TestNotification(configuration);
+
+ TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
+ TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
+ TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+
+ List<TestMessage> messages = new LinkedList<>();
+ messages.add(message1);
+ messages.add(message2);
+ messages.add(message3);
+
+ String messageJson1 = AbstractNotification.getMessageJson(message1);
+ String messageJson2 = AbstractNotification.getMessageJson(message2);
+ String messageJson3 = AbstractNotification.getMessageJson(message3);
+
+ notification.send(NotificationInterface.NotificationType.HOOK, messages);
+
+ assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
+ assertEquals(3, notification.messages.length);
+ assertEquals(messageJson1, notification.messages[0]);
+ assertEquals(messageJson2, notification.messages[1]);
+ assertEquals(messageJson3, notification.messages[2]);
+ }
+
+ public static class TestMessage extends HookNotification.HookNotificationMessage {
+
+ public TestMessage(HookNotification.HookNotificationType type, String user) {
+ super(type, user);
+ }
+ }
+
+ public static class TestNotification extends AbstractNotification {
+ private NotificationType type;
+ private String[] messages;
+
+ public TestNotification(Configuration applicationProperties) throws AtlasException {
+ super(applicationProperties);
+ }
+
+ @Override
+ protected void sendInternal(NotificationType notificationType, String[] notificationMessages)
+ throws NotificationException {
+
+ type = notificationType;
+ messages = notificationMessages;
+ }
+
+ @Override
+ public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
new file mode 100644
index 0000000..d1af4b0
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+import static org.testng.Assert.*;
+
+/**
+ * MessageVersion tests.
+ */
+public class MessageVersionTest {
+
+ @Test
+ public void testConstructor() throws Exception {
+ new MessageVersion("1.0.0");
+
+ try {
+ new MessageVersion("foo");
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ new MessageVersion("A.0.0");
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ new MessageVersion("1.0.0a");
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCompareTo() throws Exception {
+ MessageVersion version1 = new MessageVersion("1.0.0");
+ MessageVersion version2 = new MessageVersion("1.0.0");
+ MessageVersion version3 = new MessageVersion("2.0.0");
+ MessageVersion version4 = new MessageVersion("1");
+ MessageVersion version5 = new MessageVersion("1.5");
+ MessageVersion version6 = new MessageVersion("1.0.5");
+
+ assertTrue(version1.compareTo(version2) == 0);
+ assertTrue(version2.compareTo(version1) == 0);
+ assertTrue(version1.compareTo(version3) < 0);
+ assertTrue(version3.compareTo(version1) > 0);
+ assertTrue(version1.compareTo(version4) == 0);
+ assertTrue(version4.compareTo(version1) == 0);
+ assertTrue(version1.compareTo(version5) < 0);
+ assertTrue(version5.compareTo(version1) > 0);
+ assertTrue(version1.compareTo(version6) < 0);
+ assertTrue(version6.compareTo(version1) > 0);
+ }
+
+ @Test
+ public void testEquals() throws Exception {
+ MessageVersion version1 = new MessageVersion("1.0.0");
+ MessageVersion version2 = new MessageVersion("1.0.0");
+ MessageVersion version3 = new MessageVersion("2.0.0");
+ MessageVersion version4 = new MessageVersion("1");
+ MessageVersion version5 = new MessageVersion("1.5");
+ MessageVersion version6 = new MessageVersion("1.0.5");
+
+ assertTrue(version1.equals(version2));
+ assertTrue(version2.equals(version1));
+ assertFalse(version1.equals(version3));
+ assertFalse(version3.equals(version1));
+ assertTrue(version1.equals(version4));
+ assertTrue(version4.equals(version1));
+ assertFalse(version1.equals(version5));
+ assertFalse(version5.equals(version1));
+ assertFalse(version1.equals(version6));
+ assertFalse(version6.equals(version1));
+ }
+
+ @Test
+ public void testHashCode() throws Exception {
+ MessageVersion version1 = new MessageVersion("1.0.0");
+ MessageVersion version2 = new MessageVersion("1.0.0");
+ MessageVersion version3 = new MessageVersion("1");
+
+ assertEquals(version1.hashCode(), version2.hashCode());
+ assertEquals(version1.hashCode(), version3.hashCode());
+ }
+
+ @Test
+ public void testGetVersionParts() throws Exception {
+
+ MessageVersion version = new MessageVersion("1.0.0");
+ assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
+
+ version = new MessageVersion("1.0");
+ assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
+
+ version = new MessageVersion("1");
+ assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
+
+ version = new MessageVersion("1.0.2");
+ assertTrue(Arrays.equals(new Integer[]{1, 0, 2}, version.getVersionParts()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
new file mode 100644
index 0000000..587b7eb
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+/**
+ * VersionedMessage tests.
+ */
+public class VersionedMessageTest {
+
+ @Test
+ public void testGetVersion() throws Exception {
+ MessageVersion version = new MessageVersion("1.0.0");
+ VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a");
+ assertEquals(versionedMessage.getVersion(), version);
+ }
+
+ @Test
+ public void testGetMessage() throws Exception {
+ String message = "a";
+ MessageVersion version = new MessageVersion("1.0.0");
+ VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message);
+ assertEquals(versionedMessage.getMessage(), message);
+ }
+
+ @Test
+ public void testCompareVersion() throws Exception {
+ MessageVersion version1 = new MessageVersion("1.0.0");
+ MessageVersion version2 = new MessageVersion("2.0.0");
+ MessageVersion version3 = new MessageVersion("0.5.0");
+
+ VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a");
+
+ assertTrue(versionedMessage.compareVersion(version1) == 0);
+ assertTrue(versionedMessage.compareVersion(version2) < 0);
+ assertTrue(versionedMessage.compareVersion(version3) > 0);
+ }
+}