You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/04 08:08:18 UTC

[2/2] incubator-atlas git commit: ATLAS-631 Introduce Versioning to Atlas Notification Payload (tbeerbower via shwethags)

ATLAS-631 Introduce Versioning to Atlas Notification Payload (tbeerbower via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b2ae1371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b2ae1371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b2ae1371

Branch: refs/heads/master
Commit: b2ae1371be24cfcb13f12dcb4ebad6920b9bfd80
Parents: 73640cc
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed May 4 11:38:06 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed May 4 11:38:06 2016 +0530

----------------------------------------------------------------------
 .../org/apache/atlas/kafka/KafkaConsumer.java   |  16 +-
 .../apache/atlas/kafka/KafkaNotification.java   |  51 ++--
 .../AbstractMessageDeserializer.java            | 165 ++++++++++++
 .../notification/AbstractNotification.java      | 107 +++++++-
 .../AbstractNotificationConsumer.java           | 154 +----------
 .../IncompatibleVersionException.java           |  32 +++
 .../atlas/notification/MessageDeserializer.java |  33 +++
 .../atlas/notification/MessageVersion.java      | 133 ++++++++++
 .../notification/NotificationInterface.java     |  44 +++-
 .../atlas/notification/VersionedMessage.java    |  75 ++++++
 .../VersionedMessageDeserializer.java           | 107 ++++++++
 .../entity/EntityMessageDeserializer.java       |  76 ++++++
 .../hook/HookMessageDeserializer.java           |  60 +++++
 .../apache/atlas/kafka/KafkaConsumerTest.java   | 176 +++++++++++++
 .../atlas/kafka/KafkaNotificationTest.java      |   7 +-
 .../AbstractNotificationConsumerTest.java       | 264 +++++++++++++++++++
 .../notification/AbstractNotificationTest.java  | 120 +++++++++
 .../atlas/notification/MessageVersionTest.java  | 125 +++++++++
 .../notification/VersionedMessageTest.java      |  57 ++++
 .../entity/EntityMessageDeserializerTest.java   |  61 +++++
 .../entity/EntityNotificationImplTest.java      |   2 +-
 .../hook/HookMessageDeserializerTest.java       |  70 +++++
 .../notification/hook/HookNotificationTest.java |  20 +-
 release-log.txt                                 |   1 +
 24 files changed, 1758 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index 029a072..f1c9742 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -21,6 +21,7 @@ import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.message.MessageAndMetadata;
 import org.apache.atlas.notification.AbstractNotificationConsumer;
+import org.apache.atlas.notification.MessageDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,13 +42,16 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
     /**
      * Create a Kafka consumer.
      *
-     * @param type        the notification type returned by this consumer
-     * @param stream      the underlying Kafka stream
-     * @param consumerId  an id value for this consumer
+     * @param type          the notification type returned by this consumer
+     * @param deserializer  the message deserializer used for this consumer
+     * @param stream        the underlying Kafka stream
+     * @param consumerId    an id value for this consumer
      */
-    public KafkaConsumer(Class<T> type, KafkaStream<String, String> stream, int consumerId) {
-        super(type);
-        this.iterator = stream.iterator();
+    public KafkaConsumer(Class<T> type,
+                         MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId) {
+        super(deserializer);
+
+        this.iterator   = stream.iterator();
         this.consumerId = consumerId;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 889af11..cfffec4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -28,6 +28,7 @@ import kafka.utils.Time;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.service.Service;
@@ -172,7 +173,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
         List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
         int consumerId = 0;
         for (KafkaStream stream : kafkaConsumers) {
-            consumers.add(createKafkaConsumer(notificationType.getClassType(), stream, consumerId++));
+            KafkaConsumer<T> kafkaConsumer =
+                createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
+                    stream, consumerId++);
+            consumers.add(kafkaConsumer);
         }
         consumerConnectors.add(consumerConnector);
 
@@ -180,6 +184,22 @@ public class KafkaNotification extends AbstractNotification implements Service {
     }
 
     @Override
+    public void close() {
+        if (producer != null) {
+            producer.close();
+            producer = null;
+        }
+
+        for (ConsumerConnector consumerConnector : consumerConnectors) {
+            consumerConnector.shutdown();
+        }
+        consumerConnectors.clear();
+    }
+
+
+    // ----- AbstractNotification --------------------------------------------
+
+    @Override
     public void sendInternal(NotificationType type, String... messages) throws NotificationException {
         if (producer == null) {
             createProducer();
@@ -197,27 +217,13 @@ public class KafkaNotification extends AbstractNotification implements Service {
             try {
                 RecordMetadata response = future.get();
                 LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
-                        response.partition(), response.offset());
+                    response.partition(), response.offset());
             } catch (Exception e) {
                 throw new NotificationException(e);
             }
         }
     }
 
-    @Override
-    public void close() {
-        if (producer != null) {
-            producer.close();
-            producer = null;
-        }
-
-        for (ConsumerConnector consumerConnector : consumerConnectors) {
-            consumerConnector.shutdown();
-        }
-        consumerConnectors.clear();
-    }
-
-
     // ----- helper methods --------------------------------------------------
 
     /**
@@ -234,14 +240,17 @@ public class KafkaNotification extends AbstractNotification implements Service {
     /**
      * Create a Kafka consumer from the given Kafka stream.
      *
-     * @param stream      the Kafka stream
-     * @param consumerId  the id for the new consumer
+     * @param type          the notification type to be returned by the consumer
+     * @param deserializer  the deserializer for the created consumers
+     * @param stream        the Kafka stream
+     * @param consumerId    the id for the new consumer
      *
      * @return a new Kafka consumer
      */
-    protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
-                                                                              int consumerId) {
-        return new org.apache.atlas.kafka.KafkaConsumer<T>(type, stream, consumerId);
+    protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
+            MessageDeserializer<T> deserializer, KafkaStream stream,
+            int consumerId) {
+        return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, stream, consumerId);
     }
 
     // Get properties for consumer request

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
new file mode 100644
index 0000000..9585827
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base notification message deserializer.
+ */
+public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> {
+
+    private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>();
+
+    static {
+        DESERIALIZER_MAP.put(ImmutableList.class, new ImmutableListDeserializer());
+        DESERIALIZER_MAP.put(ImmutableMap.class, new ImmutableMapDeserializer());
+        DESERIALIZER_MAP.put(JSONArray.class, new JSONArrayDeserializer());
+        DESERIALIZER_MAP.put(IStruct.class, new StructDeserializer());
+        DESERIALIZER_MAP.put(IReferenceableInstance.class, new ReferenceableDeserializer());
+        DESERIALIZER_MAP.put(Referenceable.class, new ReferenceableDeserializer());
+    }
+
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a deserializer.
+     *
+     * @param versionedMessageType  the type of the versioned message
+     * @param expectedVersion       the expected message version
+     * @param deserializerMap       map of individual deserializers used to define this message deserializer
+     * @param notificationLogger    logger for message version mismatch
+     */
+    public AbstractMessageDeserializer(Type versionedMessageType,
+                                       MessageVersion expectedVersion,
+                                       Map<Type, JsonDeserializer> deserializerMap,
+                                       Logger notificationLogger) {
+        super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
+    }
+
+
+    // ----- helper methods --------------------------------------------------
+
+    private static Gson getDeserializer(Map<Type, JsonDeserializer> deserializerMap) {
+        GsonBuilder builder = new GsonBuilder();
+
+        for (Map.Entry<Type, JsonDeserializer> entry : DESERIALIZER_MAP.entrySet()) {
+            builder.registerTypeAdapter(entry.getKey(), entry.getValue());
+        }
+
+        for (Map.Entry<Type, JsonDeserializer> entry : deserializerMap.entrySet()) {
+            builder.registerTypeAdapter(entry.getKey(), entry.getValue());
+        }
+        return builder.create();
+    }
+
+
+    // ----- deserializer classes --------------------------------------------
+
+    /**
+     * Deserializer for ImmutableList.
+     */
+    protected static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
+        public static final Type LIST_TYPE = new TypeToken<List<?>>() {
+        }.getType();
+
+        @Override
+        public ImmutableList<?> deserialize(JsonElement json, Type type,
+                                            JsonDeserializationContext context) {
+            final List<?> list = context.deserialize(json, LIST_TYPE);
+            return ImmutableList.copyOf(list);
+        }
+    }
+
+    /**
+     * Deserializer for ImmutableMap.
+     */
+    protected static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
+
+        public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
+        }.getType();
+
+        @Override
+        public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
+                                              JsonDeserializationContext context) {
+            final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
+            return ImmutableMap.copyOf(map);
+        }
+    }
+
+    /**
+     * Deserializer for JSONArray.
+     */
+    protected static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
+        @Override
+        public JSONArray deserialize(final JsonElement json, final Type type,
+                                     final JsonDeserializationContext context) {
+            try {
+                return new JSONArray(json.toString());
+            } catch (JSONException e) {
+                throw new JsonParseException(e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * Deserializer for Struct.
+     */
+    protected static final class StructDeserializer implements JsonDeserializer<IStruct> {
+        @Override
+        public IStruct deserialize(final JsonElement json, final Type type,
+                                   final JsonDeserializationContext context) {
+            return context.deserialize(json, Struct.class);
+        }
+    }
+
+    /**
+     * Deserializer for Referenceable.
+     */
+    protected static final class ReferenceableDeserializer implements JsonDeserializer<IReferenceableInstance> {
+        @Override
+        public IReferenceableInstance deserialize(final JsonElement json, final Type type,
+                                                  final JsonDeserializationContext context) {
+
+            return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 596f988..7d22126 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -17,10 +17,22 @@
  */
 package org.apache.atlas.notification;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.configuration.Configuration;
+import org.codehaus.jettison.json.JSONArray;
 
+
+import java.lang.reflect.Type;
 import java.util.Arrays;
 import java.util.List;
 
@@ -29,12 +41,26 @@ import java.util.List;
  */
 public abstract class AbstractNotification implements NotificationInterface {
 
+    /**
+     * The current expected version for notification messages.
+     */
+    public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
+
     private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
     private final boolean embedded;
     private final boolean isHAEnabled;
 
+    /**
+     * Used for message serialization.
+     */
+    public static final Gson GSON = new GsonBuilder().
+        registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializer()).
+        registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()).
+        registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
+        create();
+
 
-    // ----- Constructors ------------------------------------------------------
+    // ----- Constructors ----------------------------------------------------
 
     public AbstractNotification(Configuration applicationProperties) throws AtlasException {
         this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
@@ -42,7 +68,23 @@ public abstract class AbstractNotification implements NotificationInterface {
     }
 
 
-    // ----- AbstractNotificationInterface -------------------------------------
+    // ----- NotificationInterface -------------------------------------------
+
+    @Override
+    public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        String[] strMessages = new String[messages.size()];
+        for (int index = 0; index < messages.size(); index++) {
+            strMessages[index] = getMessageJson(messages.get(index));
+        }
+        sendInternal(type, strMessages);
+    }
+
+    @Override
+    public <T> void send(NotificationType type, T... messages) throws NotificationException {
+        send(type, Arrays.asList(messages));
+    }
+
+    // ----- AbstractNotification --------------------------------------------
 
     /**
      * Determine whether or not the notification service embedded in Atlas server.
@@ -53,23 +95,62 @@ public abstract class AbstractNotification implements NotificationInterface {
         return embedded;
     }
 
+    /**
+     * Determine whether or not the high availability feature is enabled.
+     *
+     * @return true if the high availability feature is enabled.
+     */
     protected final boolean isHAEnabled() {
         return isHAEnabled;
     }
 
-    @Override
-    public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
-        String[] strMessages = new String[messages.size()];
-        for (int index = 0; index < messages.size(); index++) {
-            strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index));
-        }
-        sendInternal(type, strMessages);
+    /**
+     * Send the given messages.
+     *
+     * @param type      the message type
+     * @param messages  the array of messages to send
+     *
+     * @throws NotificationException if an error occurs while sending
+     */
+    protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
+
+
+    // ----- utility methods -------------------------------------------------
+
+    /**
+     * Get the notification message JSON from the given object.
+     *
+     * @param message  the message in object form
+     *
+     * @return the message as a JSON string
+     */
+    public static String getMessageJson(Object message) {
+        VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message);
+
+        return GSON.toJson(versionedMessage);
     }
 
-    @Override
-    public <T> void send(NotificationType type, T... messages) throws NotificationException {
-        send(type, Arrays.asList(messages));
+
+    // ----- serializers -----------------------------------------------------
+
+    /**
+     * Serializer for Referenceable.
+     */
+    public static final class ReferenceableSerializer implements JsonSerializer<IReferenceableInstance> {
+        @Override
+        public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
+            String instanceJson = InstanceSerialization.toJson(src, true);
+            return new JsonParser().parse(instanceJson).getAsJsonObject();
+        }
     }
 
-    protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
+    /**
+     * Serializer for JSONArray.
+     */
+    public static final class JSONArraySerializer implements JsonSerializer<JSONArray> {
+        @Override
+        public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
+            return new JsonParser().parse(src.toString()).getAsJsonArray();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 1cadb99..f00bbca 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -17,50 +17,15 @@
  */
 package org.apache.atlas.notification;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import com.google.gson.reflect.TypeToken;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.entity.EntityNotificationImpl;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Abstract notification consumer.
  */
 public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
 
-    public static final Gson GSON = new GsonBuilder().
-            registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
-            registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
-            registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
-            registerTypeAdapter(IStruct.class, new StructDeserializer()).
-            registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
-            registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
-            registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()).
-            registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()).
-            create();
-
-    private final Class<T> type;
+    /**
+     * Deserializer used to deserialize notification messages for this consumer.
+     */
+    private final MessageDeserializer<T> deserializer;
 
 
     // ----- Constructors ----------------------------------------------------
@@ -68,10 +33,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
     /**
      * Construct an AbstractNotificationConsumer.
      *
-     * @param type  the notification type
+     * @param deserializer  the message deserializer used by this consumer
      */
-    public AbstractNotificationConsumer(Class<T> type) {
-        this.type = type;
+    public AbstractNotificationConsumer(MessageDeserializer<T> deserializer) {
+        this.deserializer = deserializer;
     }
 
 
@@ -96,112 +61,11 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
 
     @Override
     public T next() {
-        return GSON.fromJson(getNext(), type);
+        return deserializer.deserialize(getNext());
     }
 
     @Override
     public T peek() {
-        return GSON.fromJson(peekMessage(), type);
-    }
-
-
-    /**
-     * Deserializer for ImmutableList used by AbstractNotificationConsumer.GSON.
-     */
-    public static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
-        public static final Type LIST_TYPE = new TypeToken<List<?>>() {
-        }.getType();
-
-        @Override
-        public ImmutableList<?> deserialize(JsonElement json, Type type,
-                                            JsonDeserializationContext context) {
-            final List<?> list = context.deserialize(json, LIST_TYPE);
-            return ImmutableList.copyOf(list);
-        }
-    }
-
-    /**
-     * Deserializer for ImmutableMap used by AbstractNotificationConsumer.GSON.
-     */
-    public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
-
-        public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
-        }.getType();
-
-        @Override
-        public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
-                                              JsonDeserializationContext context) {
-            final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
-            return ImmutableMap.copyOf(map);
-        }
-    }
-
-
-    /**
-     * Deserializer for EntityNotification used by AbstractNotificationConsumer.GSON.
-     */
-    public static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
-        @Override
-        public EntityNotification deserialize(final JsonElement json, final Type type,
-                                              final JsonDeserializationContext context) {
-            return context.deserialize(json, EntityNotificationImpl.class);
-        }
-    }
-
-    /**
-     * Serde for Struct used by AbstractNotificationConsumer.GSON.
-     */
-    public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
-        @Override
-        public IStruct deserialize(final JsonElement json, final Type type,
-                                   final JsonDeserializationContext context) {
-            return context.deserialize(json, Struct.class);
-        }
-
-        @Override
-        public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
-            String instanceJson = InstanceSerialization.toJson(src, true);
-            return new JsonParser().parse(instanceJson).getAsJsonObject();
-        }
-    }
-
-    /**
-     * Serde for Referenceable used by AbstractNotificationConsumer.GSON.
-     */
-    public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
-            JsonSerializer<IReferenceableInstance> {
-        @Override
-        public IReferenceableInstance deserialize(final JsonElement json, final Type type,
-                                                  final JsonDeserializationContext context) {
-
-            return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
-        }
-
-        @Override
-        public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
-            String instanceJson = InstanceSerialization.toJson(src, true);
-            return new JsonParser().parse(instanceJson).getAsJsonObject();
-        }
-    }
-
-    /**
-     * Serde for JSONArray used by AbstractNotificationConsumer.GSON.
-     */
-    public static final class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>,
-            JsonSerializer<JSONArray> {
-        @Override
-        public JSONArray deserialize(final JsonElement json, final Type type,
-                                     final JsonDeserializationContext context) {
-            try {
-                return new JSONArray(json.toString());
-            } catch (JSONException e) {
-                throw new JsonParseException(e.getMessage(), e);
-            }
-        }
-
-        @Override
-        public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
-            return new JsonParser().parse(src.toString()).getAsJsonArray();
-        }
+        return deserializer.deserialize(peekMessage());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java b/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java
new file mode 100644
index 0000000..6a59014
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/IncompatibleVersionException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Exception thrown when notification message is consumed that has a version that is incompatable with
+ * the expected version.
+ */
+public class IncompatibleVersionException extends RuntimeException {
+
+    // ----- Constructors ----------------------------------------------------
+
+    public IncompatibleVersionException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java
new file mode 100644
index 0000000..7778908
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/MessageDeserializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Deserializer for JSON messages.
+ */
+public interface MessageDeserializer<T> {
+    /**
+     * Get a message of type T from the given JSON message string.
+     *
+     * @param json  the JSON message
+     *
+     * @return  the message deserialized from the given JSON
+     */
+    T deserialize(String json);
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
new file mode 100644
index 0000000..3f16a9a
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+/**
+ * Represents the version of a notification message.
+ */
+public class MessageVersion implements Comparable<MessageVersion> {
+    /**
+     * Used for message with no version (old format).
+     */
+    public static final MessageVersion NO_VERSION = new MessageVersion("0");
+
+    private final String version;
+
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a message version.
+     *
+     * @param version  the version string
+     */
+    public MessageVersion(String version) {
+        this.version = version;
+
+        try {
+            getVersionParts();
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e);
+        }
+    }
+
+
+    // ----- Comparable ------------------------------------------------------
+
+    @Override
+    public int compareTo(MessageVersion that) {
+        if (that == null) {
+            return 1;
+        }
+
+        Integer[] thisParts = getVersionParts();
+        Integer[] thatParts = that.getVersionParts();
+
+        int length = Math.max(thisParts.length, thatParts.length);
+
+        for (int i = 0; i < length; i++) {
+
+            int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i);
+
+            if (comp != 0) {
+                return comp;
+            }
+        }
+        return 0;
+    }
+
+
+    // ----- Object overrides ------------------------------------------------
+
+    @Override
+    public boolean equals(Object that) {
+        if (this == that){
+            return true;
+        }
+
+        if (that == null || getClass() != that.getClass()) {
+            return false;
+        }
+
+        return compareTo((MessageVersion) that) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(getVersionParts());
+    }
+
+
+    // ----- helper methods --------------------------------------------------
+
+    /**
+     * Get the version parts array by splitting the version string.
+     * Strip the trailing zeros (i.e. '1.0.0' equals '1').
+     *
+     * @return  the version parts array
+     */
+    protected Integer[] getVersionParts() {
+
+        String[] sParts = version.split("\\.");
+        ArrayList<Integer> iParts = new ArrayList<>();
+        int trailingZeros = 0;
+
+        for (String sPart : sParts) {
+            Integer iPart = new Integer(sPart);
+
+            if (iPart == 0) {
+                ++trailingZeros;
+            } else {
+                for (int i = 0; i < trailingZeros; ++i) {
+                    iParts.add(0);
+                }
+                trailingZeros = 0;
+                iParts.add(iPart);
+            }
+        }
+        return iParts.toArray(new Integer[iParts.size()]);
+    }
+
+    private Integer getVersionPart(Integer[] versionParts, int i) {
+        return i < versionParts.length ? versionParts[i] : 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index ac285aa..384f383 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,9 +17,13 @@
  */
 package org.apache.atlas.notification;
 
+import org.apache.atlas.notification.entity.EntityMessageDeserializer;
 import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.notification.hook.HookMessageDeserializer;
 import org.apache.atlas.notification.hook.HookNotification;
+import com.google.gson.reflect.TypeToken;
 
+import java.lang.reflect.Type;
 import java.util.List;
 
 /**
@@ -37,25 +41,59 @@ public interface NotificationInterface {
     String PROPERTY_PREFIX = "atlas.notification";
 
     /**
+     * Notification message class types.
+     */
+    Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS =
+        HookNotification.HookNotificationMessage.class;
+
+    Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = EntityNotification.class;
+
+    /**
+     * Versioned notification message class types.
+     */
+    Type HOOK_VERSIONED_MESSAGE_TYPE =
+        new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType();
+
+    Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType();
+
+    /**
      * Atlas notification types.
      */
     enum NotificationType {
 
-        HOOK(HookNotification.HookNotificationMessage.class), // notifications from the Atlas integration hook producers
-        ENTITIES(EntityNotification.class);                   // notifications to entity change consumers
+        // Notifications from the Atlas integration hooks.
+        HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()),
+
+        // Notifications to entity change consumers.
+        ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer());
+
 
         /**
          * The notification class associated with this type.
          */
         private final Class classType;
 
-        NotificationType(Class classType) {
+        /**
+         * The message deserializer for this type.
+         */
+        private final MessageDeserializer deserializer;
+
+
+        NotificationType(Class classType, MessageDeserializer<?> deserializer) {
             this.classType = classType;
+            this.deserializer = deserializer;
         }
 
+
+        // ----- accessors ---------------------------------------------------
+
         public Class getClassType() {
             return classType;
         }
+
+        public MessageDeserializer getDeserializer() {
+            return deserializer;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
new file mode 100644
index 0000000..1929eb4
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+/**
+ * Represents a notification message that is associated with a version.
+ */
+public class VersionedMessage<T> {
+
+    /**
+     * The version of the message.
+     */
+    private final MessageVersion version;
+
+    /**
+     * The actual message.
+     */
+    private final T message;
+
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a versioned message.
+     *
+     * @param version  the message version
+     * @param message  the actual message
+     */
+    public VersionedMessage(MessageVersion version, T message) {
+        this.version = version;
+        this.message = message;
+    }
+
+
+    // ----- VersionedMessage ------------------------------------------------
+
+    /**
+     * Compare the version of this message with the given version.
+     *
+     * @param compareToVersion  the version to compare to
+     *
+     * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
+     *         or greater than the given version.
+     */
+    public int compareVersion(MessageVersion compareToVersion) {
+        return version.compareTo(compareToVersion);
+    }
+
+
+    // ----- accessors -------------------------------------------------------
+
+    public MessageVersion getVersion() {
+        return version;
+    }
+
+    public T getMessage() {
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
new file mode 100644
index 0000000..290be59
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+/**
+ * Deserializer that works with versioned messages.  The version of each deserialized message is checked against an
+ * expected version.
+ */
+public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
+
+    public static final String VERSION_MISMATCH_MSG =
+        "Notification message version mismatch.  Expected %s but recieved %s";
+
+    private final Type versionedMessageType;
+    private final MessageVersion expectedVersion;
+    private final Logger notificationLogger;
+    private final Gson gson;
+
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a versioned message deserializer.
+     *
+     * @param versionedMessageType  the type of the versioned message
+     * @param expectedVersion       the expected message version
+     * @param gson                  JSON serialization/deserialization
+     * @param notificationLogger    logger for message version mismatch
+     */
+    public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion,
+                                        Gson gson, Logger notificationLogger) {
+        this.versionedMessageType = versionedMessageType;
+        this.expectedVersion = expectedVersion;
+        this.gson = gson;
+        this.notificationLogger = notificationLogger;
+    }
+
+
+    // ----- MessageDeserializer ---------------------------------------------
+
+    @Override
+    public T deserialize(String messageJson) {
+        VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType);
+
+        // older style messages not wrapped with VersionedMessage
+        if (versionedMessage.getVersion() == null) {
+            Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0];
+            versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t));
+        }
+        checkVersion(versionedMessage, messageJson);
+
+        return versionedMessage.getMessage();
+    }
+
+
+    // ----- helper methods --------------------------------------------------
+
+    /**
+     * Check the message version against the expected version.
+     *
+     * @param versionedMessage  the versioned message
+     * @param messageJson       the notification message json
+     *
+     * @throws IncompatibleVersionException  if the message version is incompatable with the expected version
+     */
+    protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) {
+        int comp = versionedMessage.compareVersion(expectedVersion);
+
+        // message has newer version
+        if (comp > 0) {
+            String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion());
+            notificationLogger.error(msg);
+            notificationLogger.info(messageJson);
+            throw new IncompatibleVersionException(msg);
+        }
+
+        // message has older version
+        if (comp < 0) {
+            notificationLogger.info(
+                String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion()));
+
+            notificationLogger.info(messageJson);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
new file mode 100644
index 0000000..a6f7e64
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import org.apache.atlas.notification.AbstractMessageDeserializer;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Entity notification message deserializer.
+ */
+public class EntityMessageDeserializer extends AbstractMessageDeserializer<EntityNotification> {
+
+    /**
+     * Logger for entity notification messages.
+     */
+    private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(EntityMessageDeserializer.class);
+
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create an entity notification message deserializer.
+     */
+    public EntityMessageDeserializer() {
+        super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE,
+            AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
+    }
+
+
+    // ----- helper methods --------------------------------------------------
+
+    private static Map<Type, JsonDeserializer> getDeserializerMap() {
+        return Collections.<Type, JsonDeserializer>singletonMap(
+            NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer());
+    }
+
+
+    // ----- deserializer classes --------------------------------------------
+
+    /**
+     * Deserializer for EntityNotification.
+     */
+    protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
+        @Override
+        public EntityNotification deserialize(final JsonElement json, final Type type,
+                                              final JsonDeserializationContext context) {
+            return context.deserialize(json, EntityNotificationImpl.class);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
new file mode 100644
index 0000000..8337de0
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import com.google.gson.JsonDeserializer;
+import org.apache.atlas.notification.AbstractMessageDeserializer;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Hook notification message deserializer.
+ */
+public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification.HookNotificationMessage> {
+
+    /**
+     * Logger for hook notification messages.
+     */
+    private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(HookMessageDeserializer.class);
+
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a hook notification message deserializer.
+     */
+    public HookMessageDeserializer() {
+        super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE,
+            AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
+    }
+
+
+    // ----- helper methods --------------------------------------------------
+
+    private static Map<Type, JsonDeserializer> getDeserializerMap() {
+        return Collections.<Type, JsonDeserializer>singletonMap(
+            NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
new file mode 100644
index 0000000..7f607c6
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.MessageVersion;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.IncompatibleVersionException;
+import org.apache.atlas.notification.VersionedMessage;
+import org.apache.atlas.notification.entity.EntityNotificationImplTest;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.Struct;
+import org.codehaus.jettison.json.JSONException;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+/**
+ * KafkaConsumer tests.
+ */
+public class KafkaConsumerTest {
+
+    private static final String TRAIT_NAME = "MyTrait";
+
+    @Test
+    public void testNext() throws Exception {
+        KafkaStream<String, String> stream = mock(KafkaStream.class);
+        ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+        MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
+
+        Referenceable entity = getEntity(TRAIT_NAME);
+
+        HookNotification.EntityUpdateRequest message =
+            new HookNotification.EntityUpdateRequest("user1", entity);
+
+        String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
+
+        when(stream.iterator()).thenReturn(iterator);
+        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
+        when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
+        when(messageAndMetadata.message()).thenReturn(json);
+
+        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+            new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
+                NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+
+        assertTrue(consumer.hasNext());
+
+        HookNotification.HookNotificationMessage consumedMessage = consumer.next();
+
+        assertMessagesEqual(message, consumedMessage, entity);
+
+        assertFalse(consumer.hasNext());
+    }
+
+    @Test
+    public void testNextVersionMismatch() throws Exception {
+        KafkaStream<String, String> stream = mock(KafkaStream.class);
+        ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+        MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
+
+        Referenceable entity = getEntity(TRAIT_NAME);
+
+        HookNotification.EntityUpdateRequest message =
+            new HookNotification.EntityUpdateRequest("user1", entity);
+
+        String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
+
+        when(stream.iterator()).thenReturn(iterator);
+        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
+        when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
+        when(messageAndMetadata.message()).thenReturn(json);
+
+        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+            new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
+                NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+
+        assertTrue(consumer.hasNext());
+
+        try {
+            consumer.next();
+            fail("Expected VersionMismatchException!");
+        } catch (IncompatibleVersionException e) {
+            e.printStackTrace();
+        }
+
+        assertFalse(consumer.hasNext());
+    }
+
+    @Test
+    public void testPeekMessage() throws Exception {
+        KafkaStream<String, String> stream = mock(KafkaStream.class);
+        ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+        MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
+
+        Referenceable entity = getEntity(TRAIT_NAME);
+
+        HookNotification.EntityUpdateRequest message =
+            new HookNotification.EntityUpdateRequest("user1", entity);
+
+        String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
+
+        when(stream.iterator()).thenReturn(iterator);
+        when(iterator.hasNext()).thenReturn(true);
+        when(iterator.peek()).thenReturn(messageAndMetadata);
+        when(messageAndMetadata.message()).thenReturn(json);
+
+        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+            new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
+                NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+
+        assertTrue(consumer.hasNext());
+
+        HookNotification.HookNotificationMessage consumedMessage = consumer.peek();
+
+        assertMessagesEqual(message, consumedMessage, entity);
+
+        assertTrue(consumer.hasNext());
+    }
+
+    private Referenceable getEntity(String traitName) {
+        Referenceable entity = EntityNotificationImplTest.getEntity("id");
+        List<IStruct> traitInfo = new LinkedList<>();
+        IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
+        traitInfo.add(trait);
+        return entity;
+    }
+
+    private void assertMessagesEqual(HookNotification.EntityUpdateRequest message,
+                                     HookNotification.HookNotificationMessage consumedMessage,
+                                     Referenceable entity) throws JSONException {
+
+        assertEquals(consumedMessage.getType(), message.getType());
+        assertEquals(consumedMessage.getUser(), message.getUser());
+
+        assertTrue(consumedMessage instanceof HookNotification.EntityUpdateRequest);
+
+        HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
+            (HookNotification.EntityUpdateRequest) consumedMessage;
+
+        Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
+        assertEquals(deserializedEntity.getId(), entity.getId());
+        assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
+        assertEquals(deserializedEntity.getTraits(), entity.getTraits());
+        assertEquals(deserializedEntity.getTrait(TRAIT_NAME), entity.getTrait(TRAIT_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index db34815..17fda25 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -22,6 +22,7 @@ import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.serializer.StringDecoder;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
@@ -130,10 +131,12 @@ public class KafkaNotificationTest {
         }
 
         @Override
-        protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
+        protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
+                                                                                  MessageDeserializer<T> deserializer,
+                                                                                  KafkaStream stream,
                                                                                   int consumerId) {
             kafkaStreams.add(stream);
-            return super.createKafkaConsumer(type, stream, consumerId);
+            return super.createKafkaConsumer(type, deserializer, stream, consumerId);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
new file mode 100644
index 0000000..e63175d
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.slf4j.Logger;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.*;
+
+/**
+ * AbstractNotificationConsumer tests.
+ */
+public class AbstractNotificationConsumerTest {
+
+    private static final Gson GSON = new Gson();
+
+    @Test
+    public void testNext() throws Exception {
+        Logger logger = mock(Logger.class);
+
+        TestMessage testMessage1 = new TestMessage("sValue1", 99);
+        TestMessage testMessage2 = new TestMessage("sValue2", 98);
+        TestMessage testMessage3 = new TestMessage("sValue3", 97);
+        TestMessage testMessage4 = new TestMessage("sValue4", 96);
+
+        List<String> jsonList = new LinkedList<>();
+
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
+
+        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+        NotificationConsumer<TestMessage> consumer =
+            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(testMessage1, consumer.next());
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(testMessage2, consumer.next());
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(testMessage3, consumer.next());
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(testMessage4, consumer.next());
+
+        assertFalse(consumer.hasNext());
+    }
+
+    @Test
+    public void testNextBackVersion() throws Exception {
+        Logger logger = mock(Logger.class);
+
+        TestMessage testMessage1 = new TestMessage("sValue1", 99);
+        TestMessage testMessage2 = new TestMessage("sValue2", 98);
+        TestMessage testMessage3 = new TestMessage("sValue3", 97);
+        TestMessage testMessage4 = new TestMessage("sValue4", 96);
+
+        List<String> jsonList = new LinkedList<>();
+
+        String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
+        String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
+        String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3));
+        String json4 = GSON.toJson(testMessage4);
+
+        jsonList.add(json1);
+        jsonList.add(json2);
+        jsonList.add(json3);
+        jsonList.add(json4);
+
+        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+        NotificationConsumer<TestMessage> consumer =
+            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+        assertTrue(consumer.hasNext());
+
+        assertEquals(new TestMessage("sValue1", 99), consumer.next());
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(new TestMessage("sValue2", 98), consumer.next());
+        verify(logger).info(json2);
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(new TestMessage("sValue3", 97), consumer.next());
+        verify(logger).info(json3);
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(new TestMessage("sValue4", 96), consumer.next());
+        verify(logger).info(json4);
+
+        assertFalse(consumer.hasNext());
+    }
+
+    @Test
+    public void testNextForwardVersion() throws Exception {
+        Logger logger = mock(Logger.class);
+
+        TestMessage testMessage1 = new TestMessage("sValue1", 99);
+        TestMessage testMessage2 = new TestMessage("sValue2", 98);
+
+        List<String> jsonList = new LinkedList<>();
+
+        String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
+        String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
+
+        jsonList.add(json1);
+        jsonList.add(json2);
+
+        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+        NotificationConsumer<TestMessage> consumer =
+            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+        assertTrue(consumer.hasNext());
+
+        assertEquals(testMessage1, consumer.next());
+
+        assertTrue(consumer.hasNext());
+
+        try {
+            consumer.next();
+            fail("Expected VersionMismatchException!");
+        } catch (IncompatibleVersionException e) {
+            verify(logger).info(json2);
+        }
+
+        assertFalse(consumer.hasNext());
+    }
+
+    @Test
+    public void testPeek() throws Exception {
+        Logger logger = mock(Logger.class);
+
+        TestMessage testMessage1 = new TestMessage("sValue1", 99);
+        TestMessage testMessage2 = new TestMessage("sValue2", 98);
+        TestMessage testMessage3 = new TestMessage("sValue3", 97);
+        TestMessage testMessage4 = new TestMessage("sValue4", 96);
+
+        List<String> jsonList = new LinkedList<>();
+
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
+
+        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
+
+        NotificationConsumer<TestMessage> consumer =
+            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+        assertTrue(consumer.hasNext());
+
+        assertEquals(testMessage1, consumer.peek());
+
+        assertTrue(consumer.hasNext());
+
+        assertEquals(testMessage1, consumer.peek());
+
+        assertTrue(consumer.hasNext());
+    }
+
+    private static class TestMessage {
+        private String s;
+        private int i;
+
+        public TestMessage(String s, int i) {
+            this.s = s;
+            this.i = i;
+        }
+
+        public String getS() {
+            return s;
+        }
+
+        public void setS(String s) {
+            this.s = s;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            TestMessage that = (TestMessage) o;
+
+            return i == that.i && (s != null ? s.equals(that.s) : that.s == null);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = s != null ? s.hashCode() : 0;
+            result = 31 * result + i;
+            return result;
+        }
+    }
+
+    private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
+        private final List<String> messageList;
+        private int index = 0;
+
+        public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) {
+            super(new TestDeserializer<T>(versionedMessageType, logger));
+            this.messageList = messages;
+        }
+
+        @Override
+        protected String getNext() {
+            return messageList.get(index++);
+        }
+
+        @Override
+        protected String peekMessage() {
+            return messageList.get(index);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return index < messageList.size();
+        }
+    }
+
+    private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
+
+        private TestDeserializer(Type versionedMessageType, Logger logger) {
+            super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
new file mode 100644
index 0000000..61107a9
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.commons.configuration.Configuration;
+import org.testng.annotations.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.*;
+
+/**
+ * AbstractNotification tests.
+ */
+public class AbstractNotificationTest {
+
+    @Test
+    public void testSend() throws Exception {
+        Configuration configuration = mock(Configuration.class);
+
+        TestNotification notification = new TestNotification(configuration);
+
+        TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
+        TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
+        TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+
+        String messageJson1 = AbstractNotification.getMessageJson(message1);
+        String messageJson2 = AbstractNotification.getMessageJson(message2);
+        String messageJson3 = AbstractNotification.getMessageJson(message3);
+
+        notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
+
+        assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
+        assertEquals(3, notification.messages.length);
+        assertEquals(messageJson1, notification.messages[0]);
+        assertEquals(messageJson2, notification.messages[1]);
+        assertEquals(messageJson3, notification.messages[2]);
+    }
+
+    @Test
+    public void testSend2() throws Exception {
+        Configuration configuration = mock(Configuration.class);
+
+        TestNotification notification = new TestNotification(configuration);
+
+        TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
+        TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
+        TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+
+        List<TestMessage> messages = new LinkedList<>();
+        messages.add(message1);
+        messages.add(message2);
+        messages.add(message3);
+
+        String messageJson1 = AbstractNotification.getMessageJson(message1);
+        String messageJson2 = AbstractNotification.getMessageJson(message2);
+        String messageJson3 = AbstractNotification.getMessageJson(message3);
+
+        notification.send(NotificationInterface.NotificationType.HOOK, messages);
+
+        assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
+        assertEquals(3, notification.messages.length);
+        assertEquals(messageJson1, notification.messages[0]);
+        assertEquals(messageJson2, notification.messages[1]);
+        assertEquals(messageJson3, notification.messages[2]);
+    }
+
+    public static class TestMessage extends HookNotification.HookNotificationMessage {
+
+        public TestMessage(HookNotification.HookNotificationType type, String user) {
+            super(type, user);
+        }
+    }
+
+    public static class TestNotification extends AbstractNotification {
+        private NotificationType type;
+        private String[] messages;
+
+        public TestNotification(Configuration applicationProperties) throws AtlasException {
+            super(applicationProperties);
+        }
+
+        @Override
+        protected void sendInternal(NotificationType notificationType, String[] notificationMessages)
+            throws NotificationException {
+
+            type = notificationType;
+            messages = notificationMessages;
+        }
+
+        @Override
+        public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
+            return null;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
new file mode 100644
index 0000000..d1af4b0
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+import static org.testng.Assert.*;
+
+/**
+ * MessageVersion tests.
+ */
+public class MessageVersionTest {
+
+    @Test
+    public void testConstructor() throws Exception {
+        new MessageVersion("1.0.0");
+
+        try {
+            new MessageVersion("foo");
+            fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        try {
+            new MessageVersion("A.0.0");
+            fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        try {
+            new MessageVersion("1.0.0a");
+            fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testCompareTo() throws Exception {
+        MessageVersion version1 = new MessageVersion("1.0.0");
+        MessageVersion version2 = new MessageVersion("1.0.0");
+        MessageVersion version3 = new MessageVersion("2.0.0");
+        MessageVersion version4 = new MessageVersion("1");
+        MessageVersion version5 = new MessageVersion("1.5");
+        MessageVersion version6 = new MessageVersion("1.0.5");
+
+        assertTrue(version1.compareTo(version2) == 0);
+        assertTrue(version2.compareTo(version1) == 0);
+        assertTrue(version1.compareTo(version3) < 0);
+        assertTrue(version3.compareTo(version1) > 0);
+        assertTrue(version1.compareTo(version4) == 0);
+        assertTrue(version4.compareTo(version1) == 0);
+        assertTrue(version1.compareTo(version5) < 0);
+        assertTrue(version5.compareTo(version1) > 0);
+        assertTrue(version1.compareTo(version6) < 0);
+        assertTrue(version6.compareTo(version1) > 0);
+    }
+
+    @Test
+    public void testEquals() throws Exception {
+        MessageVersion version1 = new MessageVersion("1.0.0");
+        MessageVersion version2 = new MessageVersion("1.0.0");
+        MessageVersion version3 = new MessageVersion("2.0.0");
+        MessageVersion version4 = new MessageVersion("1");
+        MessageVersion version5 = new MessageVersion("1.5");
+        MessageVersion version6 = new MessageVersion("1.0.5");
+
+        assertTrue(version1.equals(version2));
+        assertTrue(version2.equals(version1));
+        assertFalse(version1.equals(version3));
+        assertFalse(version3.equals(version1));
+        assertTrue(version1.equals(version4));
+        assertTrue(version4.equals(version1));
+        assertFalse(version1.equals(version5));
+        assertFalse(version5.equals(version1));
+        assertFalse(version1.equals(version6));
+        assertFalse(version6.equals(version1));
+    }
+
+    @Test
+    public void testHashCode() throws Exception {
+        MessageVersion version1 = new MessageVersion("1.0.0");
+        MessageVersion version2 = new MessageVersion("1.0.0");
+        MessageVersion version3 = new MessageVersion("1");
+
+        assertEquals(version1.hashCode(), version2.hashCode());
+        assertEquals(version1.hashCode(), version3.hashCode());
+    }
+
+    @Test
+    public void testGetVersionParts() throws Exception {
+
+        MessageVersion version = new MessageVersion("1.0.0");
+        assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
+
+        version = new MessageVersion("1.0");
+        assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
+
+        version = new MessageVersion("1");
+        assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
+
+        version = new MessageVersion("1.0.2");
+        assertTrue(Arrays.equals(new Integer[]{1, 0, 2}, version.getVersionParts()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b2ae1371/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
new file mode 100644
index 0000000..587b7eb
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+/**
+ * VersionedMessage tests.
+ */
+public class VersionedMessageTest {
+
+    @Test
+    public void testGetVersion() throws Exception {
+        MessageVersion version = new MessageVersion("1.0.0");
+        VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a");
+        assertEquals(versionedMessage.getVersion(), version);
+    }
+
+    @Test
+    public void testGetMessage() throws Exception {
+        String message = "a";
+        MessageVersion version = new MessageVersion("1.0.0");
+        VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message);
+        assertEquals(versionedMessage.getMessage(), message);
+    }
+
+    @Test
+    public void testCompareVersion() throws Exception {
+        MessageVersion version1 = new MessageVersion("1.0.0");
+        MessageVersion version2 = new MessageVersion("2.0.0");
+        MessageVersion version3 = new MessageVersion("0.5.0");
+
+        VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a");
+
+        assertTrue(versionedMessage.compareVersion(version1) == 0);
+        assertTrue(versionedMessage.compareVersion(version2) < 0);
+        assertTrue(versionedMessage.compareVersion(version3) > 0);
+    }
+}