You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/06 22:50:06 UTC

[2/3] atlas git commit: ATLAS-2251: notification module updates

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 988d98a..8bc7cb4 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -18,17 +18,19 @@
 package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
 import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.codehaus.jettison.json.JSONArray;
@@ -44,8 +46,8 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
-import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
+import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
+import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
 
 /**
  * Abstract notification interface implementation.
@@ -78,14 +80,6 @@ public abstract class AbstractNotification implements NotificationInterface {
     private final boolean embedded;
     private final boolean isHAEnabled;
 
-    /**
-     * Used for message serialization.
-     */
-    public static final Gson GSON = new GsonBuilder().
-        registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()).
-        registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
-        create();
-
     // ----- Constructors ----------------------------------------------------
 
     public AbstractNotification(Configuration applicationProperties) throws AtlasException {
@@ -158,7 +152,7 @@ public abstract class AbstractNotification implements NotificationInterface {
     public static String getMessageJson(Object message) {
         AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
 
-        return GSON.toJson(notificationMsg);
+        return AtlasType.toV1Json(notificationMsg);
     }
 
     private static String getHostAddress() {
@@ -188,7 +182,7 @@ public abstract class AbstractNotification implements NotificationInterface {
      */
     public static void createNotificationMessages(Object message, List<String> msgJsonList) {
         AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
-        String                      msgJson         = GSON.toJson(notificationMsg);
+        String                      msgJson         = AtlasType.toV1Json(notificationMsg);
 
         boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
 
@@ -213,7 +207,7 @@ public abstract class AbstractNotification implements NotificationInterface {
                     if (!msgLengthExceedsLimit) { // no need to split
                         AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind);
 
-                        msgJson  = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
+                        msgJson  = AtlasType.toV1Json(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
                         msgBytes = null; // not used after this point
                     } else { // encodedBytes will be split
                         msgJson  = null; // not used after this point
@@ -239,7 +233,7 @@ public abstract class AbstractNotification implements NotificationInterface {
 
                         AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount);
 
-                        String splitMsgJson = GSON.toJson(splitMsg);
+                        String splitMsgJson = AtlasType.toV1Json(splitMsg);
 
                         msgJsonList.add(splitMsgJson);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 8cf1e8e..c3940ce 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -16,30 +16,19 @@
  * limitations under the License.
  */
 package org.apache.atlas.notification;
+
 import org.apache.kafka.common.TopicPartition;
 
+
 /**
  * Abstract notification consumer.
  */
 public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
+    protected final AtlasNotificationMessageDeserializer<T> deserializer;
 
-    /**
-     * Deserializer used to deserialize notification messages for this consumer.
-     */
-    protected final MessageDeserializer<T> deserializer;
-
-
-
-    /**
-     * Construct an AbstractNotificationConsumer.
-     *
-     * @param deserializer  the message deserializer used by this consumer
-     */
-    public AbstractNotificationConsumer(MessageDeserializer<T> deserializer) {
+    protected AbstractNotificationConsumer(AtlasNotificationMessageDeserializer<T> deserializer) {
         this.deserializer = deserializer;
     }
 
-
-
     public abstract void commit(TopicPartition partition, long offset);
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
deleted file mode 100644
index 3b377de..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.commons.compress.utils.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-
-public class AtlasNotificationBaseMessage {
-    private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class);
-
-    public static final int     MESSAGE_MAX_LENGTH_BYTES    = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop;
-    public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean();
-
-    public enum CompressionKind { NONE, GZIP };
-
-    private MessageVersion  version            = null;
-    private String          msgId              = null;
-    private CompressionKind msgCompressionKind = CompressionKind.NONE;
-    private int             msgSplitIdx        = 1;
-    private int             msgSplitCount      = 1;
-
-
-    public AtlasNotificationBaseMessage() {
-    }
-
-    public AtlasNotificationBaseMessage(MessageVersion version) {
-        this(version, null, CompressionKind.NONE);
-    }
-
-    public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
-        this.version            = version;
-        this.msgId              = msgId;
-        this.msgCompressionKind = msgCompressionKind;
-    }
-
-    public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
-        this.version            = version;
-        this.msgId              = msgId;
-        this.msgCompressionKind = msgCompressionKind;
-        this.msgSplitIdx        = msgSplitIdx;
-        this.msgSplitCount      = msgSplitCount;
-    }
-
-    public void setVersion(MessageVersion version) {
-        this.version = version;
-    }
-
-    public MessageVersion getVersion() {
-        return version;
-    }
-
-    public String getMsgId() {
-        return msgId;
-    }
-
-    public void setMsgId(String msgId) {
-        this.msgId = msgId;
-    }
-
-    public CompressionKind getMsgCompressionKind() {
-        return msgCompressionKind;
-    }
-
-    public void setMsgCompressed(CompressionKind msgCompressionKind) {
-        this.msgCompressionKind = msgCompressionKind;
-    }
-
-    public int getMsgSplitIdx() {
-        return msgSplitIdx;
-    }
-
-    public void setMsgSplitIdx(int msgSplitIdx) {
-        this.msgSplitIdx = msgSplitIdx;
-    }
-
-    public int getMsgSplitCount() {
-        return msgSplitCount;
-    }
-
-    public void setMsgSplitCount(int msgSplitCount) {
-        this.msgSplitCount = msgSplitCount;
-    }
-
-    /**
-     * Compare the version of this message with the given version.
-     *
-     * @param compareToVersion  the version to compare to
-     *
-     * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
-     *         or greater than the given version.
-     */
-    public int compareVersion(MessageVersion compareToVersion) {
-        return version.compareTo(compareToVersion);
-    }
-
-
-    public static byte[] getBytesUtf8(String str) {
-        return StringUtils.getBytesUtf8(str);
-    }
-
-    public static String getStringUtf8(byte[] bytes) {
-        return StringUtils.newStringUtf8(bytes);
-    }
-
-    public static byte[] encodeBase64(byte[] bytes) {
-        return Base64.encodeBase64(bytes);
-    }
-
-    public static byte[] decodeBase64(byte[] bytes) {
-        return Base64.decodeBase64(bytes);
-    }
-
-    public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) {
-        return encodeBase64(gzipCompress(bytes));
-    }
-
-    public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) {
-        return gzipUncompress(decodeBase64(bytes));
-    }
-
-    public static String gzipCompress(String str) {
-        byte[] bytes           = getBytesUtf8(str);
-        byte[] compressedBytes = gzipCompress(bytes);
-        byte[] encodedBytes    = encodeBase64(compressedBytes);
-
-        return getStringUtf8(encodedBytes);
-    }
-
-    public static String gzipUncompress(String str) {
-        byte[] encodedBytes    = getBytesUtf8(str);
-        byte[] compressedBytes = decodeBase64(encodedBytes);
-        byte[] bytes           = gzipUncompress(compressedBytes);
-
-        return getStringUtf8(bytes);
-    }
-
-    public static byte[] gzipCompress(byte[] content) {
-        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-
-        try {
-            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
-
-            gzipOutputStream.write(content);
-            gzipOutputStream.close();
-        } catch (IOException e) {
-            LOG.error("gzipCompress(): error compressing {} bytes", content.length, e);
-
-            throw new RuntimeException(e);
-        }
-
-        return byteArrayOutputStream.toByteArray();
-    }
-
-    public static byte[] gzipUncompress(byte[] content) {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-        try {
-            IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out);
-        } catch (IOException e) {
-            LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e);
-        }
-
-        return out.toByteArray();
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
deleted file mode 100644
index 63d93c9..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.Instant;
-
-/**
- * Represents a notification message that is associated with a version.
- */
-public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
-    private String msgSourceIP;
-    private String msgCreatedBy;
-    private long   msgCreationTime;
-
-    /**
-     * The actual message.
-     */
-    private final T message;
-
-
-    // ----- Constructors ----------------------------------------------------
-
-    /**
-     * Create a notification message.
-     *
-     * @param version  the message version
-     * @param message  the actual message
-     */
-    public AtlasNotificationMessage(MessageVersion version, T message) {
-        this(version, message, null, null);
-    }
-
-    public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
-        super(version);
-
-        this.msgSourceIP     = msgSourceIP;
-        this.msgCreatedBy    = createdBy;
-        this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis();
-        this.message         = message;
-    }
-
-
-    public String getMsgSourceIP() {
-        return msgSourceIP;
-    }
-
-    public void setMsgSourceIP(String msgSourceIP) {
-        this.msgSourceIP = msgSourceIP;
-    }
-
-    public String getMsgCreatedBy() {
-        return msgCreatedBy;
-    }
-
-    public void setMsgCreatedBy(String msgCreatedBy) {
-        this.msgCreatedBy = msgCreatedBy;
-    }
-
-    public long getMsgCreationTime() {
-        return msgCreationTime;
-    }
-
-    public void setMsgCreationTime(long msgCreationTime) {
-        this.msgCreationTime = msgCreationTime;
-    }
-
-    public T getMessage() {
-        return message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 2a175ba..d6e6878 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -19,14 +19,17 @@
 package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
 import org.apache.commons.lang3.StringUtils;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -47,11 +50,10 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     public static final String VERSION_MISMATCH_MSG =
         "Notification message version mismatch. Expected %s but recieved %s. Message %s";
 
-    private final Type notificationMessageType;
-    private final Type messageType;
-    private final MessageVersion expectedVersion;
-    private final Logger notificationLogger;
-    private final Gson gson;
+    private final TypeReference<T>                           messageType;
+    private final TypeReference<AtlasNotificationMessage<T>> notificationMessageType;
+    private final MessageVersion                             expectedVersion;
+    private final Logger                                     notificationLogger;
 
 
     private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>();
@@ -65,33 +67,40 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
     /**
      * Create a notification message deserializer.
      *
-     * @param notificationMessageType the type of the notification message
      * @param expectedVersion         the expected message version
-     * @param gson                    JSON serialization/deserialization
      * @param notificationLogger      logger for message version mismatch
      */
-    public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
-                                                Gson gson, Logger notificationLogger) {
-        this(notificationMessageType, expectedVersion, gson, notificationLogger,
+    public AtlasNotificationMessageDeserializer(TypeReference<T> messageType,
+                                                TypeReference<AtlasNotificationMessage<T>> notificationMessageType,
+                                                MessageVersion expectedVersion, Logger notificationLogger) {
+        this(messageType, notificationMessageType, expectedVersion, notificationLogger,
              NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000,
              NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000);
     }
 
-    public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
-                                                Gson gson, Logger notificationLogger,
+    public AtlasNotificationMessageDeserializer(TypeReference<T> messageType,
+                                                TypeReference<AtlasNotificationMessage<T>> notificationMessageType,
+                                                MessageVersion expectedVersion,
+                                                Logger notificationLogger,
                                                 long splitMessageSegmentsWaitTimeMs,
                                                 long splitMessageBufferPurgeIntervalMs) {
+        this.messageType                       = messageType;
         this.notificationMessageType           = notificationMessageType;
-        this.messageType                       = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
         this.expectedVersion                   = expectedVersion;
-        this.gson                              = gson;
         this.notificationLogger                = notificationLogger;
         this.splitMessageSegmentsWaitTimeMs    = splitMessageSegmentsWaitTimeMs;
         this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs;
     }
 
-    // ----- MessageDeserializer ---------------------------------------------
+    public TypeReference<T> getMessageType() {
+        return messageType;
+    }
 
+    public TypeReference<AtlasNotificationMessage<T>> getNotificationMessageType() {
+        return notificationMessageType;
+    }
+
+    // ----- MessageDeserializer ---------------------------------------------
     @Override
     public T deserialize(String messageJson) {
         final T ret;
@@ -99,15 +108,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
         messageCountTotal.incrementAndGet();
         messageCountSinceLastInterval.incrementAndGet();
 
-        AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
+        AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class);
 
         if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
-            ret = gson.fromJson(messageJson, messageType);
+            ret = AtlasType.fromV1Json(messageJson, messageType);
         } else  {
             String msgJson = messageJson;
 
             if (msg.getMsgSplitCount() > 1) { // multi-part message
-                AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+                AtlasNotificationStringMessage splitMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);
 
                 checkVersion(splitMsg, msgJson);
 
@@ -184,7 +193,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
                                     LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length);
                                 }
 
-                                msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class);
+                                msg = AtlasType.fromV1Json(msgJson, AtlasNotificationBaseMessage.class);
                             } else {
                                 msg = null;
                             }
@@ -197,7 +206,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
 
             if (msg != null) {
                 if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) {
-                    AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+                    AtlasNotificationStringMessage compressedMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);
 
                     byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage());
                     byte[] bytes        = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
@@ -207,7 +216,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
                     LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
                 }
 
-                AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType);
+                AtlasNotificationMessage<T> atlasNotificationMessage = AtlasType.fromV1Json(msgJson, notificationMessageType);
 
                 checkVersion(atlasNotificationMessage, msgJson);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
deleted file mode 100644
index 41485a0..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-
-public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage {
-    private String message = null;
-
-    public AtlasNotificationStringMessage() {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION);
-    }
-
-    public AtlasNotificationStringMessage(String message) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION);
-
-        this.message = message;
-    }
-
-    public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
-
-        this.message = message;
-    }
-
-    public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
-
-        this.message = message;
-    }
-
-    public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
-
-        this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes);
-    }
-
-    public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
-
-        this.message = new String(encodedBytes, offset, length);
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
deleted file mode 100644
index 7f96638..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-/**
- * Represents the version of a notification message.
- */
-public class MessageVersion implements Comparable<MessageVersion> {
-    /**
-     * Used for message with no version (old format).
-     */
-    public static final MessageVersion NO_VERSION = new MessageVersion("0");
-    public static final MessageVersion VERSION_1  = new MessageVersion("1.0.0");
-
-    public static final MessageVersion CURRENT_VERSION = VERSION_1;
-
-    private final String version;
-
-
-    // ----- Constructors ----------------------------------------------------
-
-    /**
-     * Create a message version.
-     *
-     * @param version  the version string
-     */
-    public MessageVersion(String version) {
-        this.version = version;
-
-        try {
-            getVersionParts();
-        } catch (NumberFormatException e) {
-            throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e);
-        }
-    }
-
-
-    // ----- Comparable ------------------------------------------------------
-
-    @Override
-    public int compareTo(MessageVersion that) {
-        if (that == null) {
-            return 1;
-        }
-
-        Integer[] thisParts = getVersionParts();
-        Integer[] thatParts = that.getVersionParts();
-
-        int length = Math.max(thisParts.length, thatParts.length);
-
-        for (int i = 0; i < length; i++) {
-
-            int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i);
-
-            if (comp != 0) {
-                return comp;
-            }
-        }
-        return 0;
-    }
-
-
-    // ----- Object overrides ------------------------------------------------
-
-    @Override
-    public boolean equals(Object that) {
-        if (this == that){
-            return true;
-        }
-
-        if (that == null || getClass() != that.getClass()) {
-            return false;
-        }
-
-        return compareTo((MessageVersion) that) == 0;
-    }
-
-    @Override
-    public int hashCode() {
-        return Arrays.hashCode(getVersionParts());
-    }
-
-
-    @Override
-    public String toString() {
-        return "MessageVersion[version=" + version + "]";
-    }
-
-    // ----- helper methods --------------------------------------------------
-
-    /**
-     * Get the version parts array by splitting the version string.
-     * Strip the trailing zeros (i.e. '1.0.0' equals '1').
-     *
-     * @return  the version parts array
-     */
-    protected Integer[] getVersionParts() {
-
-        String[] sParts = version.split("\\.");
-        ArrayList<Integer> iParts = new ArrayList<>();
-        int trailingZeros = 0;
-
-        for (String sPart : sParts) {
-            Integer iPart = new Integer(sPart);
-
-            if (iPart == 0) {
-                ++trailingZeros;
-            } else {
-                for (int i = 0; i < trailingZeros; ++i) {
-                    iParts.add(0);
-                }
-                trailingZeros = 0;
-                iParts.add(iPart);
-            }
-        }
-        return iParts.toArray(new Integer[iParts.size()]);
-    }
-
-    private Integer getVersionPart(Integer[] versionParts, int i) {
-        return i < versionParts.length ? versionParts[i] : 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 8809225..975967d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -18,10 +18,13 @@
 package org.apache.atlas.notification;
 
 import com.google.gson.reflect.TypeToken;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.entity.EntityMessageDeserializer;
-import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.notification.hook.HookMessageDeserializer;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
+import org.codehaus.jackson.type.TypeReference;
+import scala.reflect.internal.Types;
 
 import java.lang.reflect.Type;
 import java.util.List;
@@ -53,47 +56,25 @@ public interface NotificationInterface {
     /**
      * Versioned notification message class types.
      */
-    Type HOOK_VERSIONED_MESSAGE_TYPE =
-        new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType();
-
     Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType();
 
     /**
      * Atlas notification types.
      */
     enum NotificationType {
-
         // Notifications from the Atlas integration hooks.
-        HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()),
+        HOOK(new HookMessageDeserializer()),
 
         // Notifications to entity change consumers.
-        ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer());
-
-
-        /**
-         * The notification class associated with this type.
-         */
-        private final Class classType;
-
-        /**
-         * The message deserializer for this type.
-         */
-        private final MessageDeserializer deserializer;
+        ENTITIES(new EntityMessageDeserializer());
 
+        private final AtlasNotificationMessageDeserializer deserializer;
 
-        NotificationType(Class classType, MessageDeserializer<?> deserializer) {
-            this.classType = classType;
+        NotificationType(AtlasNotificationMessageDeserializer deserializer) {
             this.deserializer = deserializer;
         }
 
-
-        // ----- accessors ---------------------------------------------------
-
-        public Class getClassType() {
-            return classType;
-        }
-
-        public MessageDeserializer getDeserializer() {
+        public AtlasNotificationMessageDeserializer getDeserializer() {
             return deserializer;
         }
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
index 148b57f..10df121 100644
--- a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
+++ b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
@@ -18,6 +18,8 @@
 package org.apache.atlas.notification;
 
 
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
+
 public class SplitMessageAggregator {
     private final String                           msgId;
     private final AtlasNotificationStringMessage[] splitMessagesBuffer;

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
index a6f7e64..526aa93 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
@@ -18,19 +18,14 @@
 
 package org.apache.atlas.notification.entity;
 
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.AbstractMessageDeserializer;
 import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.Map;
-
 /**
  * Entity notification message deserializer.
  */
@@ -48,29 +43,8 @@ public class EntityMessageDeserializer extends AbstractMessageDeserializer<Entit
      * Create an entity notification message deserializer.
      */
     public EntityMessageDeserializer() {
-        super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE,
-            AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
-    }
-
-
-    // ----- helper methods --------------------------------------------------
-
-    private static Map<Type, JsonDeserializer> getDeserializerMap() {
-        return Collections.<Type, JsonDeserializer>singletonMap(
-            NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer());
-    }
-
-
-    // ----- deserializer classes --------------------------------------------
-
-    /**
-     * Deserializer for EntityNotification.
-     */
-    protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
-        @Override
-        public EntityNotification deserialize(final JsonElement json, final Type type,
-                                              final JsonDeserializationContext context) {
-            return context.deserialize(json, EntityNotificationImpl.class);
-        }
+        super(new TypeReference<EntityNotification>() {},
+              new TypeReference<AtlasNotificationMessage<EntityNotification>>() {},
+              AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
deleted file mode 100644
index 96e2e2f..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.entity;
-
-
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-
-import java.util.List;
-
-/**
- * Notification of entity changes.
- */
-public interface EntityNotification {
-
-    /**
-     * Operations that result in an entity notification.
-     */
-    enum OperationType {
-        ENTITY_CREATE,
-        ENTITY_UPDATE,
-        ENTITY_DELETE,
-        TRAIT_ADD,
-        TRAIT_DELETE,
-        TRAIT_UPDATE
-    }
-
-
-    // ----- EntityNotification ------------------------------------------------
-
-    /**
-     * Get the entity that is associated with this notification.
-     *
-     * @return the associated entity
-     */
-    Referenceable getEntity();
-
-    /**
-     * Get flattened list of traits that are associated with this entity (includes super traits).
-     *
-     * @return the list of all traits
-     */
-    List<Struct> getAllTraits();
-
-    /**
-     * Get the type of operation that triggered this notification.
-     *
-     * @return the operation type
-     */
-    OperationType getOperationType();
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
deleted file mode 100644
index ab8e4c8..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.entity;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.type.AtlasClassificationType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * Entity notification implementation.
- */
-public class EntityNotificationImpl implements EntityNotification {
-
-    private final Referenceable entity;
-    private final OperationType operationType;
-    private final List<Struct> traits;
-
-
-    // ----- Constructors ------------------------------------------------------
-
-    /**
-     * No-arg constructor for serialization.
-     */
-    @SuppressWarnings("unused")
-    private EntityNotificationImpl() throws AtlasException {
-        this(null, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
-    }
-
-    /**
-     * Construct an EntityNotification.
-     *
-     * @param entity            the entity subject of the notification
-     * @param operationType     the type of operation that caused the notification
-     * @param traits            the traits for the given entity
-     *
-     * @throws AtlasException if the entity notification can not be created
-     */
-    public EntityNotificationImpl(Referenceable entity, OperationType operationType, List<Struct> traits)
-        throws AtlasException {
-        this.entity = entity;
-        this.operationType = operationType;
-        this.traits = traits;
-    }
-
-    /**
-     * Construct an EntityNotification.
-     *
-     * @param entity         the entity subject of the notification
-     * @param operationType  the type of operation that caused the notification
-     * @param typeRegistry     the Atlas type system
-     *
-     * @throws AtlasException if the entity notification can not be created
-     */
-    public EntityNotificationImpl(Referenceable entity, OperationType operationType, AtlasTypeRegistry typeRegistry)
-        throws AtlasException {
-        this(entity, operationType, getAllTraits(entity, typeRegistry));
-    }
-
-
-    // ----- EntityNotification ------------------------------------------------
-
-    @Override
-    public Referenceable getEntity() {
-        return entity;
-    }
-
-    @Override
-    public List<Struct> getAllTraits() {
-        return traits;
-    }
-
-    @Override
-    public OperationType getOperationType() {
-        return operationType;
-    }
-
-
-    // ----- Object overrides --------------------------------------------------
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        EntityNotificationImpl that = (EntityNotificationImpl) o;
-        return Objects.equals(entity, that.entity) &&
-                operationType == that.operationType &&
-                Objects.equals(traits, that.traits);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(entity, operationType, traits);
-    }
-
-
-    // ----- helper methods ----------------------------------------------------
-
-    private static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) throws AtlasException {
-        List<Struct> ret = new LinkedList<>();
-
-        for (String traitName : entityDefinition.getTraitNames()) {
-            Struct                  trait          = entityDefinition.getTrait(traitName);
-            AtlasClassificationType traitType      = typeRegistry.getClassificationTypeByName(traitName);
-            Set<String>             superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null;
-
-            ret.add(trait);
-
-            if (CollectionUtils.isNotEmpty(superTypeNames)) {
-                for (String superTypeName : superTypeNames) {
-                    Struct superTypeTrait = new Struct(superTypeName);
-
-                    if (MapUtils.isNotEmpty(trait.getValues())) {
-                        AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
-
-                        if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) {
-                            Map<String, Object> attributes = new HashMap<>();
-
-                            // TODO: add superTypeTrait attributess
-
-                            superTypeTrait.setValues(attributes);
-                        }
-                    }
-
-                    ret.add(superTypeTrait);
-                }
-            }
-        }
-
-        return ret;
-    }}

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
index 8337de0..1b337d4 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
@@ -18,16 +18,15 @@
 
 package org.apache.atlas.notification.hook;
 
-import com.google.gson.JsonDeserializer;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.AbstractMessageDeserializer;
 import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.v1.model.notification.HookNotification;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.Map;
+
 
 /**
  * Hook notification message deserializer.
@@ -46,15 +45,11 @@ public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNot
      * Create a hook notification message deserializer.
      */
     public HookMessageDeserializer() {
-        super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE,
-            AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
+        super(new TypeReference<HookNotification.HookNotificationMessage>() {},
+              new TypeReference<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>() {},
+              AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
     }
 
 
     // ----- helper methods --------------------------------------------------
-
-    private static Map<Type, JsonDeserializer> getDeserializerMap() {
-        return Collections.<Type, JsonDeserializer>singletonMap(
-            NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification());
-    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
deleted file mode 100644
index ca596ea..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.hook;
-
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.typedef.TypesDef;
-import org.apache.atlas.type.AtlasType;
-import org.apache.commons.lang.StringUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Contains the structure of messages transferred from hooks to atlas.
- */
-public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> {
-
-    @Override
-    public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
-                                               JsonDeserializationContext context) {
-        HookNotificationType type =
-                context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
-        switch (type) {
-        case ENTITY_CREATE:
-            return context.deserialize(json, EntityCreateRequest.class);
-
-        case ENTITY_FULL_UPDATE:
-            return context.deserialize(json, EntityUpdateRequest.class);
-
-        case ENTITY_PARTIAL_UPDATE:
-            return context.deserialize(json, EntityPartialUpdateRequest.class);
-
-        case ENTITY_DELETE:
-            return context.deserialize(json, EntityDeleteRequest.class);
-
-        case TYPE_CREATE:
-        case TYPE_UPDATE:
-            return context.deserialize(json, TypeRequest.class);
-
-        default:
-            throw new IllegalStateException("Unhandled type " + type);
-        }
-    }
-
-    /**
-     * Type of the hook message.
-     */
-    public enum HookNotificationType {
-        TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE
-    }
-
-    /**
-     * Base type of hook message.
-     */
-    public static class HookNotificationMessage {
-        public static final String UNKNOW_USER = "UNKNOWN";
-        protected HookNotificationType type;
-        protected String user;
-
-        private HookNotificationMessage() {
-        }
-
-        public HookNotificationMessage(HookNotificationType type, String user) {
-            this.type = type;
-            this.user = user;
-        }
-
-        public HookNotificationType getType() {
-            return type;
-        }
-
-        public String getUser() {
-            if (StringUtils.isEmpty(user)) {
-                return UNKNOW_USER;
-            }
-            return user;
-        }
-
-
-    }
-
-    /**
-     * Hook message for create type definitions.
-     */
-    public static class TypeRequest extends HookNotificationMessage {
-        private TypesDef typesDef;
-
-        private TypeRequest() {
-        }
-
-        public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
-            super(type, user);
-            this.typesDef = typesDef;
-        }
-
-        public TypesDef getTypesDef() {
-            return typesDef;
-        }
-    }
-
-    /**
-     * Hook message for creating new entities.
-     */
-    public static class EntityCreateRequest extends HookNotificationMessage {
-        private List<Referenceable> entities;
-
-        private EntityCreateRequest() {
-        }
-
-        public EntityCreateRequest(String user, Referenceable... entities) {
-            this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
-        }
-
-        public EntityCreateRequest(String user, List<Referenceable> entities) {
-            this(HookNotificationType.ENTITY_CREATE, entities, user);
-        }
-
-        protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
-            super(type, user);
-            this.entities = entities;
-        }
-
-        public EntityCreateRequest(String user, JSONArray jsonArray) {
-            super(HookNotificationType.ENTITY_CREATE, user);
-            entities = new ArrayList<>();
-            for (int index = 0; index < jsonArray.length(); index++) {
-                try {
-                    entities.add(AtlasType.fromV1Json(jsonArray.getString(index), Referenceable.class));
-                } catch (JSONException e) {
-                    throw new JsonParseException(e);
-                }
-            }
-        }
-
-        public List<Referenceable> getEntities() {
-            return entities;
-        }
-
-        @Override
-        public String toString() {
-            return entities.toString();
-        }
-    }
-
-    /**
-     * Hook message for updating entities(full update).
-     */
-    public static class EntityUpdateRequest extends EntityCreateRequest {
-        public EntityUpdateRequest(String user, Referenceable... entities) {
-            this(user, Arrays.asList(entities));
-        }
-
-        public EntityUpdateRequest(String user, List<Referenceable> entities) {
-            super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
-        }
-    }
-
-    /**
-     * Hook message for updating entities(partial update).
-     */
-    public static class EntityPartialUpdateRequest extends HookNotificationMessage {
-        private String typeName;
-        private String attribute;
-        private Referenceable entity;
-        private String attributeValue;
-
-        private EntityPartialUpdateRequest() {
-        }
-
-        public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue,
-                                          Referenceable entity) {
-            super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
-            this.typeName = typeName;
-            this.attribute = attribute;
-            this.attributeValue = attributeValue;
-            this.entity = entity;
-        }
-
-        public String getTypeName() {
-            return typeName;
-        }
-
-        public String getAttribute() {
-            return attribute;
-        }
-
-        public Referenceable getEntity() {
-            return entity;
-        }
-
-        public String getAttributeValue() {
-            return attributeValue;
-        }
-
-        @Override
-        public String toString() {
-            return  "{"
-                + "entityType='" + typeName + '\''
-                + ", attribute=" + attribute
-                + ", value=" + attributeValue
-                + ", entity=" + entity
-                + '}';
-        }
-    }
-
-    /**
-     * Hook message for creating new entities.
-     */
-    public static class EntityDeleteRequest extends HookNotificationMessage {
-
-        private String typeName;
-        private String attribute;
-        private String attributeValue;
-
-        private EntityDeleteRequest() {
-        }
-
-        public EntityDeleteRequest(String user, String typeName, String attribute, String attributeValue) {
-            this(HookNotificationType.ENTITY_DELETE, user, typeName, attribute, attributeValue);
-        }
-
-        protected EntityDeleteRequest(HookNotificationType type,
-            String user, String typeName, String attribute, String attributeValue) {
-            super(type, user);
-            this.typeName = typeName;
-            this.attribute = attribute;
-            this.attributeValue = attributeValue;
-        }
-
-        public String getTypeName() {
-            return typeName;
-        }
-
-        public String getAttribute() {
-            return attribute;
-        }
-
-        public String getAttributeValue() {
-            return attributeValue;
-        }
-
-        @Override
-        public String toString() {
-            return  "{"
-                + "entityType='" + typeName + '\''
-                + ", attribute=" + attribute
-                + ", value=" + attributeValue
-                + '}';
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index d59cb1c..9ce2a50 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -20,7 +20,7 @@ package org.apache.atlas.hook;
 
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 071a725..f1fc741 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -22,10 +22,11 @@ import kafka.message.MessageAndMetadata;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.notification.*;
-import org.apache.atlas.notification.AtlasNotificationMessage;
-import org.apache.atlas.notification.entity.EntityNotificationImplTest;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.notification.entity.EntityNotificationTest;
+import org.apache.atlas.v1.model.notification.HookNotification;
 import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -96,7 +97,7 @@ public class KafkaConsumerTest {
         when(messageAndMetadata.message()).thenReturn(json);
 
 
-        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
+        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L);
         List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
         assertTrue(messageList.size() > 0);
 
@@ -131,7 +132,7 @@ public class KafkaConsumerTest {
         when(kafkaConsumer.poll(100L)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer ,false, 100L);
         try {
             List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
             assertTrue(messageList.size() > 0);
@@ -151,7 +152,7 @@ public class KafkaConsumerTest {
 
         TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L);
 
         consumer.commit(tp, 1);
 
@@ -163,7 +164,7 @@ public class KafkaConsumerTest {
 
         TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, true , 100L);
 
         consumer.commit(tp, 1);
 
@@ -171,7 +172,7 @@ public class KafkaConsumerTest {
     }
 
     private Referenceable getEntity(String traitName) {
-        Referenceable entity = EntityNotificationImplTest.getEntity("id");
+        Referenceable entity = EntityNotificationTest.getEntity("id");
         List<Struct> traitInfo = new LinkedList<>();
         Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
         traitInfo.add(trait);

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 5e3cf41..fe019e1 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -22,13 +22,13 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index f313ddc..caa72ce 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -18,13 +18,15 @@
 
 package org.apache.atlas.notification;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -41,8 +43,6 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class AbstractNotificationConsumerTest {
 
-    private static final Gson GSON = new Gson();
-
     @Test
     public void testReceive() throws Exception {
         Logger logger = mock(Logger.class);
@@ -54,27 +54,24 @@ public class AbstractNotificationConsumerTest {
 
         List jsonList = new LinkedList<>();
 
-        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
-        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
-        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
-        jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
-
-        Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
+        jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+        jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+        jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+        jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
 
-        NotificationConsumer<TestMessage> consumer =
-                new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+        NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
 
         List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
         assertFalse(messageList.isEmpty());
 
-        assertEquals(testMessage1, messageList.get(0).getMessage());
+        assertEquals(messageList.get(0).getMessage(), testMessage1);
 
-        assertEquals(testMessage2, messageList.get(1).getMessage());
+        assertEquals(messageList.get(1).getMessage(), testMessage2);
 
-        assertEquals(testMessage3, messageList.get(2).getMessage());
+        assertEquals(messageList.get(2).getMessage(), testMessage3);
 
-        assertEquals(testMessage4, messageList.get(3).getMessage());
+        assertEquals(messageList.get(3).getMessage(), testMessage4);
     }
 
     @Test
@@ -88,20 +85,17 @@ public class AbstractNotificationConsumerTest {
 
         List jsonList = new LinkedList<>();
 
-        String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
-        String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
-        String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
-        String json4 = GSON.toJson(testMessage4);
+        String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+        String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
+        String json3 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
+        String json4 = AtlasType.toV1Json(testMessage4);
 
         jsonList.add(json1);
         jsonList.add(json2);
         jsonList.add(json3);
         jsonList.add(json4);
 
-        Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
-
-        NotificationConsumer<TestMessage> consumer =
-            new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+        NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
 
         List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
@@ -124,16 +118,13 @@ public class AbstractNotificationConsumerTest {
 
         List jsonList = new LinkedList<>();
 
-        String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
-        String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
+        String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+        String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
 
         jsonList.add(json1);
         jsonList.add(json2);
 
-        Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
-
-        NotificationConsumer<TestMessage> consumer =
-            new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+        NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
         try {
             List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
@@ -150,7 +141,10 @@ public class AbstractNotificationConsumerTest {
 
     private static class TestMessage {
         private String s;
-        private int i;
+        private int    i;
+
+        public TestMessage() {
+        }
 
         public TestMessage(String s, int i) {
             this.s = s;
@@ -165,6 +159,14 @@ public class AbstractNotificationConsumerTest {
             this.s = s;
         }
 
+        public int getI() {
+            return i;
+        }
+
+        public void setI(int i) {
+            this.i = i;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -180,12 +182,14 @@ public class AbstractNotificationConsumerTest {
         }
     }
 
-    private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
-        private final List<T> messageList;
-        private int index = 0;
+    private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> {
+        private final List<TestMessage> messageList;
+        private       int              index = 0;
+
+
+        public TestNotificationConsumer(List<TestMessage> messages, Logger logger) {
+            super(new TestMessageDeserializer());
 
-        public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) {
-            super(new TestDeserializer<T>(notificationMessageType, logger));
             this.messageList = messages;
         }
 
@@ -205,24 +209,35 @@ public class AbstractNotificationConsumerTest {
         }
 
         @Override
-        public List<AtlasKafkaMessage<T>> receive() {
+        public List<AtlasKafkaMessage<TestMessage>> receive() {
             return receive(1000L);
         }
 
         @Override
-        public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
-            List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
+        public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) {
+            List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList();
             for(Object json :  messageList) {
-                tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
+                tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1));
             }
             return tempMessageList;
         }
     }
 
-    private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
+    public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> {
+        /**
+         * Logger for hook notification messages.
+         */
+        private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(TestMessageDeserializer.class);
+
+
+        // ----- Constructors ----------------------------------------------------
 
-        private TestDeserializer(Type notificationMessageType, Logger logger) {
-            super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
+        /**
+         * Create a hook notification message deserializer.
+         */
+        public TestMessageDeserializer() {
+            super(new TypeReference<TestMessage>() {}, new TypeReference<AtlasNotificationMessage<TestMessage>>() {},
+                  AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 655252c..98d7d2c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -19,7 +19,8 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.v1.model.notification.HookNotification;
 import org.apache.commons.configuration.Configuration;
 import org.testng.annotations.Test;
 
@@ -98,8 +99,8 @@ public class AbstractNotificationTest {
 
     // ignore msgCreationTime in Json
     private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) {
-        Map<Object, Object> msgActual   = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class);
-        Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class);
+        Map<Object, Object> msgActual   = AtlasType.fromV1Json(msgJsonActual, Map.class);
+        Map<Object, Object> msgExpected = AtlasType.fromV1Json(msgJsonExpected, Map.class);
 
         msgActual.remove("msgCreationTime");
         msgExpected.remove("msgCreationTime");

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
index 27b5034..91a195d 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.atlas.notification;
 
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.MessageVersion;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.*;

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
index d1af4b0..d8b3b34 100644
--- a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.notification;
 
+import org.apache.atlas.model.notification.MessageVersion;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
index 0807221..b79735a 100644
--- a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
@@ -17,7 +17,8 @@
  */
 package org.apache.atlas.notification;
 
-import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
index faafb87..ddb63b5 100644
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
@@ -21,6 +21,7 @@ package org.apache.atlas.notification.entity;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.v1.model.notification.EntityNotification;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -34,19 +35,18 @@ import static org.testng.Assert.assertEquals;
  * EntityMessageDeserializer tests.
  */
 public class EntityMessageDeserializerTest {
+    private EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
 
     @Test
     public void testDeserialize() throws Exception {
-        EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
-
-        Referenceable entity = EntityNotificationImplTest.getEntity("id");
+        Referenceable entity = EntityNotificationTest.getEntity("id");
         String traitName = "MyTrait";
         List<Struct> traitInfo = new LinkedList<>();
         Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
         traitInfo.add(trait);
 
-        EntityNotificationImpl notification =
-            new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
+        EntityNotification notification =
+            new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
 
         List<String> jsonMsgList = new ArrayList<>();
 
@@ -55,7 +55,7 @@ public class EntityMessageDeserializerTest {
         EntityNotification deserializedNotification = null;
 
         for (String jsonMsg : jsonMsgList) {
-            deserializedNotification = deserializer.deserialize(jsonMsg);
+            deserializedNotification =  deserializer.deserialize(jsonMsg);
 
             if (deserializedNotification != null) {
                 break;