You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/11/10 16:52:43 UTC

[1/2] incubator-atlas git commit: ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master c93e0972a -> 6f421e997


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
index 213e46c..6102427 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
@@ -20,9 +20,12 @@ package org.apache.atlas.typesystem;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.classification.InterfaceAudience;
 import org.apache.atlas.typesystem.persistence.Id;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -75,6 +78,27 @@ public class Referenceable extends Struct implements IReferenceableInstance {
         traits = ImmutableMap.copyOf(_traits);
     }
 
+    /**
+     * Construct a Referenceable from the given ITypedReferenceableInstance.
+     *
+     * @param instance  the typed referenceable instance to copy
+     *
+     * @throws AtlasException if the referenceable can not be created
+     */
+    public Referenceable(ITypedReferenceableInstance instance) throws AtlasException {
+        this(instance.getId()._getId(), instance.getTypeName(), instance.getValuesMap(), instance.getTraits(),
+            getTraits(instance));
+    }
+
+    /**
+     * No-arg constructor for serialization.
+     */
+    @SuppressWarnings("unused")
+    private Referenceable() {
+        this("", "", Collections.<String, Object>emptyMap(), Collections.<String>emptyList(),
+            Collections.<String, IStruct>emptyMap());
+    }
+
     @Override
     public ImmutableList<String> getTraits() {
         return traitNames;
@@ -89,4 +113,13 @@ public class Referenceable extends Struct implements IReferenceableInstance {
     public IStruct getTrait(String typeName) {
         return traits.get(typeName);
     }
+
+    private static Map<String, IStruct> getTraits(ITypedReferenceableInstance instance) {
+        Map<String, IStruct> traits = new HashMap<>();
+
+        for (String traitName : instance.getTraits() ) {
+            traits.put(traitName, instance.getTrait(traitName));
+        }
+        return traits;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
index d57fbe0..d03e2c2 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
@@ -20,6 +20,7 @@ package org.apache.atlas.typesystem;
 
 import org.apache.atlas.classification.InterfaceAudience;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,6 +42,15 @@ public class Struct implements IStruct {
         }
     }
 
+    /**
+     * No-arg constructor for serialization.
+     */
+    @SuppressWarnings("unused")
+    private Struct() {
+        this("", Collections.<String, Object>emptyMap());
+    }
+
+
     @Override
     public String getTypeName() {
         return typeName;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index 341acec..8d66dc3 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -61,6 +61,8 @@ atlas.kafka.data=target/data/kafka
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=100
+atlas.kafka.hook.group.id=atlas
+atlas.kafka.entities.group.id=atlas_entities
 
 #########  Security Properties  #########
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index fd05d28..f9c0cbb 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -33,9 +33,13 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RepositoryMetadataModule;
+import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
 import org.apache.atlas.repository.graph.GraphProvider;
 import org.apache.atlas.service.Services;
+import org.apache.atlas.services.MetadataService;
+import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.commons.configuration.Configuration;
@@ -114,6 +118,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
         LoginProcessor loginProcessor = new LoginProcessor();
         loginProcessor.login();
 
+        initMetadataService();
         startServices();
     }
 
@@ -154,4 +159,17 @@ public class GuiceServletConfig extends GuiceServletContextListener {
         Services services = injector.getInstance(Services.class);
         services.stop();
     }
+
+    // initialize the metadata service
+    private void initMetadataService() {
+        MetadataService metadataService = injector.getInstance(MetadataService.class);
+
+        // add a listener for entity changes
+        NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class);
+
+        NotificationEntityChangeListener listener =
+            new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance());
+
+        metadataService.registerListener(listener);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
new file mode 100644
index 0000000..204b95a
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.Inject;
+import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.web.resources.BaseResourceIT;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+/**
+ * Entity Notification Integration Tests.
+ */
+@Guice(modules = NotificationModule.class)
+public class EntityNotificationIT extends BaseResourceIT {
+
+  @Inject
+  private NotificationInterface notificationInterface;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    super.setUp();
+    createTypeDefinitions();
+  }
+
+  @Test
+  public void testEntityNotification() throws Exception {
+
+    List<NotificationConsumer<EntityNotification>> consumers =
+        notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+
+    NotificationConsumer<EntityNotification> consumer =  consumers.iterator().next();
+    final EntityNotificationConsumer notificationConsumer = new EntityNotificationConsumer(consumer);
+    Thread thread = new Thread(notificationConsumer);
+    thread.start();
+
+    createEntity("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
+
+    waitFor(10000, new Predicate() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return notificationConsumer.entityNotification != null;
+      }
+    });
+
+    Assert.assertNotNull(notificationConsumer.entityNotification);
+    Assert.assertEquals(EntityNotification.OperationType.ENTITY_CREATE, notificationConsumer.entityNotification.getOperationType());
+    Assert.assertEquals(DATABASE_TYPE, notificationConsumer.entityNotification.getEntity().getTypeName());
+    Assert.assertEquals("Sales", notificationConsumer.entityNotification.getEntity().get("name"));
+  }
+
+  private void createEntity(String name, String description, String owner, String locationUri, String... traitNames)
+      throws Exception {
+
+    Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
+    referenceable.set("name", name);
+    referenceable.set("description", description);
+    referenceable.set("owner", owner);
+    referenceable.set("locationUri", locationUri);
+    referenceable.set("createTime", System.currentTimeMillis());
+
+    createInstance(referenceable);
+  }
+
+  private static class EntityNotificationConsumer implements Runnable {
+    private final NotificationConsumer<EntityNotification> consumerIterator;
+    private EntityNotification entityNotification = null;
+
+    public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
+      this.consumerIterator = consumerIterator;
+    }
+
+    @Override
+    public void run() {
+      while(consumerIterator.hasNext()) {
+        entityNotification = consumerIterator.next();
+      }
+    }
+  }
+}


[2/2] incubator-atlas git commit: ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)

Posted by sh...@apache.org.
ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/6f421e99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/6f421e99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/6f421e99

Branch: refs/heads/master
Commit: 6f421e9977f0a1f34af145c991b44662cfd53e45
Parents: c93e097
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Tue Nov 10 21:22:35 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Tue Nov 10 21:22:35 2015 +0530

----------------------------------------------------------------------
 .../atlas/listener/EntityChangeListener.java    |  69 +++++
 .../atlas/listener/TypesChangeListener.java     |  49 ++++
 distro/src/conf/application.properties          |   1 +
 docs/src/site/twiki/Architecture.twiki          |   2 +-
 docs/src/site/twiki/Configuration.twiki         |  10 +-
 docs/src/site/twiki/Notification-Entity.twiki   |  33 +++
 docs/src/site/twiki/index.twiki                 |   2 +
 notification/pom.xml                            |   5 +
 .../org/apache/atlas/kafka/KafkaConsumer.java   |  30 ++-
 .../apache/atlas/kafka/KafkaNotification.java   | 264 +++++++++++--------
 .../notification/AbstractNotification.java      |  49 ++++
 .../AbstractNotificationConsumer.java           | 174 ++++++++++++
 .../notification/NotificationConsumer.java      |  14 +-
 .../notification/NotificationHookConsumer.java  |  14 +-
 .../notification/NotificationInterface.java     |  46 ++--
 .../notification/entity/EntityNotification.java |  63 +++++
 .../entity/EntityNotificationImpl.java          | 174 ++++++++++++
 .../NotificationEntityChangeListener.java       | 100 +++++++
 .../atlas/kafka/KafkaNotificationTest.java      | 114 +++++++-
 .../entity/EntityNotificationImplTest.java      | 149 +++++++++++
 release-log.txt                                 |   1 +
 repository/pom.xml                              |   5 +
 .../atlas/listener/EntityChangeListener.java    |  56 ----
 .../atlas/listener/TypesChangeListener.java     |  49 ----
 .../atlas/services/DefaultMetadataService.java  |  46 +++-
 .../apache/atlas/services/MetadataService.java  |  15 ++
 .../apache/atlas/typesystem/Referenceable.java  |  33 +++
 .../org/apache/atlas/typesystem/Struct.java     |  10 +
 .../src/main/resources/application.properties   |   2 +
 .../atlas/web/listeners/GuiceServletConfig.java |  18 ++
 .../notification/EntityNotificationIT.java      | 101 +++++++
 31 files changed, 1409 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/common/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/common/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
new file mode 100644
index 0000000..08ed0d3
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.listener;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+
+import java.util.Collection;
+
+/**
+ * Entity (a Typed instance) change notification listener.
+ */
+public interface EntityChangeListener {
+
+    /**
+     * This is upon adding new entities to the repository.
+     *
+     * @param entities  the created entities
+     *
+     * @throws AtlasException if the listener notification fails
+     */
+    void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException;
+
+    /**
+     * This is upon updating an entity.
+     *
+     * @param entity        the updated entity
+     *
+     * @throws AtlasException if the listener notification fails
+     */
+    void onEntityUpdated(ITypedReferenceableInstance entity) throws AtlasException;
+
+    /**
+     * This is upon adding a new trait to a typed instance.
+     *
+     * @param entity        the entity
+     * @param trait     trait that needs to be added to entity
+     *
+     * @throws AtlasException if the listener notification fails
+     */
+    void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException;
+
+    /**
+     * This is upon deleting a trait from a typed instance.
+     *
+     * @param entity        the entity
+     * @param traitName     trait name for the instance that needs to be deleted from entity
+     *
+     * @throws AtlasException if the listener notification fails
+     */
+    void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/common/src/main/java/org/apache/atlas/listener/TypesChangeListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/listener/TypesChangeListener.java b/common/src/main/java/org/apache/atlas/listener/TypesChangeListener.java
new file mode 100644
index 0000000..dee396a
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/listener/TypesChangeListener.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.listener;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.typesystem.types.IDataType;
+
+import java.util.Collection;
+
+/**
+ * Types change notification listener.
+ */
+public interface TypesChangeListener {
+
+    /**
+     * This is upon adding new type(s) to Store.
+     *
+     * @param dataTypes data type
+     * @throws AtlasException
+     */
+    void onAdd(Collection<? extends IDataType> dataTypes) throws AtlasException;
+
+    /**
+     * This is upon removing an existing type from the Store.
+     *
+     * @param typeName type name
+     * @throws AtlasException
+     */
+    // void onRemove(String typeName) throws MetadataException;
+
+    // This is upon updating an existing type to the store
+    // void onChange() throws MetadataException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/distro/src/conf/application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/application.properties b/distro/src/conf/application.properties
index e7b1510..5400149 100755
--- a/distro/src/conf/application.properties
+++ b/distro/src/conf/application.properties
@@ -54,6 +54,7 @@ atlas.kafka.bootstrap.servers=localhost:9027
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
+atlas.kafka.hook.group.id=atlas
 
 
 #########  Hive Lineage Configs  #########

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/docs/src/site/twiki/Architecture.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Architecture.twiki b/docs/src/site/twiki/Architecture.twiki
index b63b44a..cb0e208 100755
--- a/docs/src/site/twiki/Architecture.twiki
+++ b/docs/src/site/twiki/Architecture.twiki
@@ -18,7 +18,7 @@ Available bridges are:
 
 
 ---++ Notification
-Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded kafka server by default.
+Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides Kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded Kafka server by default.
 
 Atlas also provides NotificationHookConsumer that runs in Atlas Service and listens to messages from hook and registers the entities in Atlas.
 <img src="images/twiki/notification.png" height="10" width="20" />

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 48cf56e..7549808 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -73,7 +73,7 @@ atlas.lineage.hive.table.schema.query=hive_table where name=?, columns
 </verbatim>
 
 ---+++ Notification Configs
-Refer http://kafka.apache.org/documentation.html#configuration for kafka configuration. All kafka configs should be prefixed with 'atlas.kafka.'
+Refer http://kafka.apache.org/documentation.html#configuration for Kafka configuration. All Kafka configs should be prefixed with 'atlas.kafka.'
 
 <verbatim>
 atlas.notification.embedded=true
@@ -83,8 +83,16 @@ atlas.kafka.bootstrap.servers=localhost:9027
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
+atlas.kafka.hook.group.id=atlas
 </verbatim>
 
+Note that Kafka group ids are specified for a specific topic.  The Kafka group id configuration for entity notifications is 'atlas.kafka.entities.group.id'
+
+<verbatim>
+atlas.kafka.entities.group.id=<consumer id>
+</verbatim>
+
+
 ---+++ Client Configs
 <verbatim>
 atlas.client.readTimeoutMSecs=60000

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/docs/src/site/twiki/Notification-Entity.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Notification-Entity.twiki b/docs/src/site/twiki/Notification-Entity.twiki
new file mode 100644
index 0000000..9d883fc
--- /dev/null
+++ b/docs/src/site/twiki/Notification-Entity.twiki
@@ -0,0 +1,33 @@
+---+ Entity Change Notifications
+
+To receive Atlas entity notifications a consumer should be obtained through the notification interface.  Entity change notifications are sent every time a change is made to an entity.  Operations that result in an entity change notification are:
+   * <code>ENTITY_CREATE</code> - Create a new entity.
+   * <code>ENTITY_UPDATE</code> - Update an attribute of an existing entity.
+   * <code>TRAIT_ADD</code> - Add a trait to an entity.
+   * <code>TRAIT_DELETE</code> - Delete a trait from an entity.
+
+ <verbatim>
+    // Obtain provider through injection…
+    Provider<NotificationInterface> provider;
+
+    // Get the notification interface
+    NotificationInterface notification = provider.get();
+
+    // Create consumers
+    List<NotificationConsumer<EntityNotification>> consumers =
+       notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+</verbatim>
+
+
+The consumer exposes the Iterator interface that should be used to get the entity notifications as they are posted.  The hasNext() method blocks until a notification is available.
+
+<verbatim>
+    while(consumer.hasNext()) {
+        EntityNotification notification = consumer.next();
+
+        IReferenceableInstance entity = notification.getEntity();
+        …
+    }
+</verbatim>
+
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/docs/src/site/twiki/index.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/index.twiki b/docs/src/site/twiki/index.twiki
index 073cf24..c49752d 100755
--- a/docs/src/site/twiki/index.twiki
+++ b/docs/src/site/twiki/index.twiki
@@ -43,6 +43,8 @@ allows integration with the whole enterprise data ecosystem.
    * [[Search][Search]]
    * [[security][security]]
    * [[Configuration][Configuration]]
+   * Notification
+      * [[Notification-Entity][Entity Notification]]
    * Bridges
       * [[Bridge-Hive][Hive Bridge]]
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/pom.xml
----------------------------------------------------------------------
diff --git a/notification/pom.xml b/notification/pom.xml
index 2e12520..796ea17 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -76,5 +76,10 @@
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index 70bb5d6..d4e07c0 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -20,28 +20,50 @@ package org.apache.atlas.kafka;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.message.MessageAndMetadata;
-import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.AbstractNotificationConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaConsumer implements NotificationConsumer {
+/**
+ * Kafka specific notification consumer.
+ *
+ * @param <T>  the notification type returned by this consumer
+ */
+public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
 
     private final int consumerId;
     private final ConsumerIterator iterator;
 
-    public KafkaConsumer(KafkaStream<String, String> stream, int consumerId) {
+
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Create a Kafka consumer.
+     *
+     * @param type        the notification type returned by this consumer
+     * @param stream      the underlying Kafka stream
+     * @param consumerId  an id value for this consumer
+     */
+    public KafkaConsumer(Class<T> type, KafkaStream<String, String> stream, int consumerId) {
+        super(type);
         this.iterator = stream.iterator();
         this.consumerId = consumerId;
     }
 
+
+    // ----- Iterator --------------------------------------------------------
+
     @Override
     public boolean hasNext() {
         return iterator.hasNext();
     }
 
+
+    // ----- AbstractNotificationConsumer ------------------------------------
+
     @Override
-    public String next() {
+    public String getNext() {
         MessageAndMetadata message = iterator.next();
         LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
                 consumerId, message.topic(), message.partition(), message.offset(), message.message());

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 7b3cf89..bacabeb 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -27,16 +27,13 @@ import kafka.server.KafkaServer;
 import kafka.utils.Time;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
-import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -61,18 +58,22 @@ import java.util.Properties;
 import java.util.concurrent.Future;
 
 @Singleton
-public class KafkaNotification extends NotificationInterface implements Service {
+/**
+ * Kafka specific access point to the Atlas notification framework.
+ */
+public class KafkaNotification extends AbstractNotification implements Service {
     public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
 
     public static final String PROPERTY_PREFIX = "atlas.kafka";
 
     private static final String ATLAS_KAFKA_DATA = "data";
 
-    public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
+    public static final String ATLAS_HOOK_TOPIC     = "ATLAS_HOOK";
     public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
-    public static final String ATLAS_TYPES_TOPIC = "ATLAS_TYPES";
+    public static final String ATLAS_TYPES_TOPIC    = "ATLAS_TYPES";
+
+    protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
-    private static final String ATLAS_GROUP = "atlas";
     private KafkaServer kafkaServer;
     private ServerCnxnFactory factory;
     private Properties properties;
@@ -80,20 +81,21 @@ public class KafkaNotification extends NotificationInterface implements Service
     private KafkaProducer producer = null;
     private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
 
-    private KafkaConsumer consumer;
-
     private static final Map<NotificationType, String> topicMap = new HashMap<NotificationType, String>() {{
         put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
         put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
-        put(NotificationType.TYPES, ATLAS_TYPES_TOPIC);
     }};
 
-    private synchronized void createProducer() {
-        if (producer == null) {
-            producer = new KafkaProducer(properties);
-        }
-    }
 
+    // ----- Constructors ----------------------------------------------------
+
+    /**
+     * Construct a KafkaNotification.
+     *
+     * @param applicationProperties  the application properties used to configure Kafka
+     *
+     * @throws AtlasException if the notification interface can not be created
+     */
     public KafkaNotification(Configuration applicationProperties) throws AtlasException {
         super(applicationProperties);
         Configuration subsetConfiguration =
@@ -104,64 +106,22 @@ public class KafkaNotification extends NotificationInterface implements Service
 
         //Override default configs
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringSerializer");
+            "org.apache.kafka.common.serialization.StringSerializer");
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringSerializer");
+            "org.apache.kafka.common.serialization.StringSerializer");
 
         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
 
-        //todo take group id as argument to allow multiple consumers??
-        properties.put(ConsumerConfig.GROUP_ID_CONFIG, ATLAS_GROUP);
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringDeserializer");
+            "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
-
-        //todo new APIs not available yet
-//        consumer = new KafkaConsumer(properties);
-//        consumer.subscribe(ATLAS_HOOK_TOPIC);
     }
 
-    private URL getURL(String url) throws MalformedURLException {
-        try {
-            return new URL(url);
-        } catch(MalformedURLException e) {
-            return new URL("http://" + url);
-        }
-    }
 
-    private String startZk() throws IOException, InterruptedException, URISyntaxException {
-        String zkValue = properties.getProperty("zookeeper.connect");
-        LOG.debug("Starting zookeeper at {}", zkValue);
-
-        URL zkAddress = getURL(zkValue);
-        this.factory = NIOServerCnxnFactory.createFactory(
-                new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
-        File snapshotDir = constructDir("zk/txn");
-        File logDir = constructDir("zk/snap");
-
-        factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
-        return factory.getLocalAddress().getAddress().toString();
-    }
-
-    private void startKafka() throws IOException, URISyntaxException {
-        String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
-        LOG.debug("Starting kafka at {}", kafkaValue);
-        URL kafkaAddress = getURL(kafkaValue);
-
-        Properties brokerConfig = properties;
-        brokerConfig.setProperty("broker.id", "1");
-        brokerConfig.setProperty("host.name", kafkaAddress.getHost());
-        brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
-        brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
-        brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
-
-        kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime());
-        kafkaServer.startup();
-        LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
-    }
+    // ----- Service ---------------------------------------------------------
 
     @Override
     public void start() throws AtlasException {
@@ -186,51 +146,27 @@ public class KafkaNotification extends NotificationInterface implements Service
         }
     }
 
-    private static class SystemTime implements Time {
-        @Override
-        public long milliseconds() {
-            return System.currentTimeMillis();
-        }
-
-        @Override
-        public long nanoseconds() {
-            return System.nanoTime();
-        }
-
-        @Override
-        public void sleep(long arg0) {
-            try {
-                Thread.sleep(arg0);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
 
-    private File constructDir(String dirPrefix) {
-        File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
-        if (!file.exists() && !file.mkdirs()) {
-            throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
-        }
-        return file;
-    }
+    // ----- NotificationInterface -------------------------------------------
 
     @Override
-    public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) {
-        String topic = topicMap.get(type);
+    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
+                                                             int numConsumers) {
+        String topic = topicMap.get(notificationType);
+
+        Properties consumerProperties = getConsumerProperties(notificationType);
 
-        ConsumerConnector consumerConnector =
-                Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
+        ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
         Map<String, Integer> topicCountMap = new HashMap<>();
         topicCountMap.put(topic, numConsumers);
         StringDecoder decoder = new StringDecoder(null);
         Map<String, List<KafkaStream<String, String>>> streamsMap =
                 consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
         List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
-        List<NotificationConsumer> consumers = new ArrayList<>(numConsumers);
+        List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
         int consumerId = 0;
         for (KafkaStream stream : kafkaConsumers) {
-            consumers.add(new org.apache.atlas.kafka.KafkaConsumer(stream, consumerId++));
+            consumers.add(createKafkaConsumer(notificationType.getClassType(), stream, consumerId++));
         }
         consumerConnectors.add(consumerConnector);
 
@@ -269,35 +205,129 @@ public class KafkaNotification extends NotificationInterface implements Service
             producer = null;
         }
 
-        if (consumer != null) {
-            consumer.close();
-            consumer = null;
-        }
-
         for (ConsumerConnector consumerConnector : consumerConnectors) {
             consumerConnector.shutdown();
         }
         consumerConnectors.clear();
     }
 
-    //New API, not used now
-    private List<String> receive(long timeout) throws NotificationException {
-        Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout);
-        List<String> messages = new ArrayList<>();
-        if (recordsMap != null) {
-            for (ConsumerRecords records : recordsMap.values()) {
-                List<ConsumerRecord> recordList = records.records();
-                for (ConsumerRecord record : recordList) {
-                    try {
-                        String message = (String) record.value();
-                        LOG.debug("Received message from topic {}: {}", ATLAS_HOOK_TOPIC, message);
-                        messages.add(message);
-                    } catch (Exception e) {
-                        throw new NotificationException(e);
-                    }
-                }
+
+    // ----- helper methods --------------------------------------------------
+
+    /**
+     * Create a Kafka consumer connector from the given properties.
+     *
+     * @param properties  the properties for creating the consumer connector
+     *
+     * @return a new Kafka consumer connector
+     */
+    protected ConsumerConnector createConsumerConnector(Properties properties) {
+        return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
+    }
+
+    /**
+     * Create a Kafka consumer from the given Kafka stream.
+     *
+     * @param stream      the Kafka stream
+     * @param consumerId  the id for the new consumer
+     *
+     * @return a new Kafka consumer
+     */
+    protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
+                                                                              int consumerId) {
+        return new org.apache.atlas.kafka.KafkaConsumer<T>(type, stream, consumerId);
+    }
+
+    // Get properties for consumer request
+    private Properties getConsumerProperties(NotificationType type)  {
+        // find the configured group id for the given notification type
+        String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
+
+        if (groupId == null) {
+            throw new IllegalStateException("No configuration group id set for the notification type " + type);
+        }
+
+        Properties consumerProperties = new Properties();
+        consumerProperties.putAll(properties);
+        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        return consumerProperties;
+    }
+
+    private File constructDir(String dirPrefix) {
+        File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
+        if (!file.exists() && !file.mkdirs()) {
+            throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
+        }
+        return file;
+    }
+
+    private synchronized void createProducer() {
+        if (producer == null) {
+            producer = new KafkaProducer(properties);
+        }
+    }
+
+    private URL getURL(String url) throws MalformedURLException {
+        try {
+            return new URL(url);
+        } catch(MalformedURLException e) {
+            return new URL("http://" + url);
+        }
+    }
+
+    private String startZk() throws IOException, InterruptedException, URISyntaxException {
+        String zkValue = properties.getProperty("zookeeper.connect");
+        LOG.debug("Starting zookeeper at {}", zkValue);
+
+        URL zkAddress = getURL(zkValue);
+        this.factory = NIOServerCnxnFactory.createFactory(
+            new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+        File snapshotDir = constructDir("zk/txn");
+        File logDir = constructDir("zk/snap");
+
+        factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
+        return factory.getLocalAddress().getAddress().toString();
+    }
+
+    private void startKafka() throws IOException, URISyntaxException {
+        String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+        LOG.debug("Starting kafka at {}", kafkaValue);
+        URL kafkaAddress = getURL(kafkaValue);
+
+        Properties brokerConfig = properties;
+        brokerConfig.setProperty("broker.id", "1");
+        brokerConfig.setProperty("host.name", kafkaAddress.getHost());
+        brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
+        brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
+        brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
+
+        kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime());
+        kafkaServer.startup();
+        LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
+    }
+
+
+    // ----- inner class : SystemTime ----------------------------------------
+
+    private static class SystemTime implements Time {
+        @Override
+        public long milliseconds() {
+            return System.currentTimeMillis();
+        }
+
+        @Override
+        public long nanoseconds() {
+            return System.nanoTime();
+        }
+
+        @Override
+        public void sleep(long arg0) {
+            try {
+                Thread.sleep(arg0);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
             }
         }
-        return messages;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
new file mode 100644
index 0000000..f7bb7b1
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Abstract notification interface implementation.
+ */
+public abstract class AbstractNotification implements NotificationInterface {
+
+    private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
+    private final boolean embedded;
+
+
+    // ----- Constructors ------------------------------------------------------
+
+    public AbstractNotification(Configuration applicationProperties) throws AtlasException {
+        this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
+    }
+
+
+    // ----- AbstractNotificationInterface -------------------------------------
+
+    /**
+     * Determine whether or not the notification service embedded in Atlas server.
+     *
+     * @return true if the the notification service embedded in Atlas server.
+     */
+    protected final boolean isEmbedded() {
+        return embedded;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
new file mode 100644
index 0000000..8c49d4a
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.notification.entity.EntityNotificationImpl;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract notification consumer.
+ */
+public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
+
+    private static final Gson GSON = new GsonBuilder().
+            registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
+            registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
+            registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
+            registerTypeAdapter(IStruct.class, new StructDeserializer()).
+            registerTypeAdapter(IReferenceableInstance.class, new ReferenceableDeserializer()).
+            registerTypeAdapter(JSONArray.class, new JSONArrayDeserializer()).
+            create();
+
+    private final Class<T> type;
+
+
+    // ----- Constructors ------------------------------------------------------
+
+    /**
+     * Construct an AbstractNotificationConsumer.
+     *
+     * @param type  the notification type
+     */
+    public AbstractNotificationConsumer(Class<T> type) {
+        this.type = type;
+    }
+
+
+    // ----- AbstractNotificationConsumer -------------------------------------
+
+    /**
+     * Get the next notification as a string.
+     *
+     * @return the next notification in string form
+     */
+    protected abstract String getNext();
+
+
+    // ----- Iterator ---------------------------------------------------------
+
+    @Override
+    public T next() {
+        return GSON.fromJson(getNext(), type);
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("The remove method is not supported.");
+    }
+
+
+    // ----- inner class : ImmutableListDeserializer ---------------------------
+
+    private static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
+
+        public static final Type LIST_TYPE = new TypeToken<List<?>>() {}.getType();
+
+        @Override
+        public ImmutableList<?> deserialize(JsonElement json, Type type,
+                                            JsonDeserializationContext context) throws JsonParseException {
+
+            final List<?> list = context.deserialize(json, LIST_TYPE);
+            return ImmutableList.copyOf(list);
+        }
+    }
+
+
+    // ----- inner class : ImmutableMapDeserializer ----------------------------
+
+    public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
+
+        public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {}.getType();
+
+        @Override
+        public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
+                                              JsonDeserializationContext context) throws JsonParseException {
+            final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
+            return ImmutableMap.copyOf(map);
+        }
+    }
+
+
+    // ----- inner class : EntityNotificationDeserializer ----------------------
+
+    public final static class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
+        @Override
+        public EntityNotification deserialize(final JsonElement json, final Type type,
+                                              final JsonDeserializationContext context) throws JsonParseException {
+            return context.deserialize(json, EntityNotificationImpl.class);
+        }
+    }
+
+
+    // ----- inner class : StructDeserializer -------------------------------
+
+    public final static class StructDeserializer implements JsonDeserializer<IStruct> {
+        @Override
+        public IStruct deserialize(final JsonElement json, final Type type,
+                                              final JsonDeserializationContext context) throws JsonParseException {
+            return context.deserialize(json, Struct.class);
+        }
+    }
+
+
+    // ----- inner class : ReferenceableDeserializer ------------------------
+
+    public final static class ReferenceableDeserializer implements JsonDeserializer<IStruct> {
+        @Override
+        public IReferenceableInstance deserialize(final JsonElement json, final Type type,
+                                   final JsonDeserializationContext context) throws JsonParseException {
+
+            return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
+        }
+    }
+
+
+    // ----- inner class : JSONArrayDeserializer ----------------------------
+
+    public final static class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
+        @Override
+        public JSONArray deserialize(final JsonElement json, final Type type,
+                                                  final JsonDeserializationContext context) throws JsonParseException {
+
+            try {
+                return new JSONArray(json.toString());
+            } catch (JSONException e) {
+                throw new JsonParseException(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index c3ac23b..346ec3e 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -17,16 +17,8 @@
 
 package org.apache.atlas.notification;
 
-public interface NotificationConsumer {
-    /**
-     * If there are more messages
-     * @return
-     */
-    boolean hasNext();
+import java.util.Iterator;
 
-    /**
-     * Next message - blocking call
-     * @return
-     */
-    String next();
+// TODO : docs!
+public interface NotificationConsumer<T> extends Iterator<T>{
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 29194a4..ffeb406 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -55,11 +55,11 @@ public class NotificationHookConsumer implements Service {
         String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
         atlasClient = new AtlasClient(atlasEndpoint);
         int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer> consumers =
+        List<NotificationConsumer<JSONArray>> consumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
         executors = Executors.newFixedThreadPool(consumers.size());
 
-        for (final NotificationConsumer consumer : consumers) {
+        for (final NotificationConsumer<JSONArray> consumer : consumers) {
             executors.submit(new HookConsumer(consumer));
         }
     }
@@ -78,19 +78,19 @@ public class NotificationHookConsumer implements Service {
     }
 
     class HookConsumer implements Runnable {
-        private final NotificationConsumer consumer;
+        private final NotificationConsumer<JSONArray> consumer;
 
-        public HookConsumer(NotificationConsumer consumerInterface) {
-            this.consumer = consumerInterface;
+        public HookConsumer(NotificationConsumer<JSONArray> consumer) {
+            this.consumer = consumer;
         }
 
         @Override
         public void run() {
             while(consumer.hasNext()) {
-                String entityJson = consumer.next();
+                JSONArray entityJson = consumer.next();
                 LOG.info("Processing message {}", entityJson);
                 try {
-                    JSONArray guids = atlasClient.createEntity(new JSONArray(entityJson));
+                    JSONArray guids = atlasClient.createEntity(entityJson);
                     LOG.info("Create entities with guid {}", guids);
                 } catch (Exception e) {
                     //todo handle failures

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index d4be07b..3e68998 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,36 +17,42 @@
 
 package org.apache.atlas.notification;
 
-import org.apache.atlas.AtlasException;
-import org.apache.commons.configuration.Configuration;
+import org.apache.atlas.notification.entity.EntityNotification;
+import org.codehaus.jettison.json.JSONArray;
 
 import java.util.List;
 
-public abstract class NotificationInterface {
-    public static final String PROPERTY_PREFIX = "atlas.notification";
-    private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
-    private boolean embedded;
+// TODO : docs!
+public interface NotificationInterface {
 
+    String PROPERTY_PREFIX = "atlas.notification";
 
-    public enum NotificationType {
-        HOOK, ENTITIES, TYPES
-    }
+    enum NotificationType {
+        HOOK(JSONArray.class), ENTITIES(EntityNotification.class);
+
+        private final Class classType;
+
+        NotificationType(Class classType) {
+            this.classType = classType;
+        }
 
-    public NotificationInterface(Configuration applicationProperties) throws AtlasException {
-        this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
+        public Class getClassType() {
+            return classType;
+        }
     }
 
     /**
-     * Is the notification service embedded in atlas server
-     * @return
+     * Create notification consumers for the given notification type.
+     *
+     * @param notificationType  the notification type (i.e. HOOK, ENTITIES)
+     * @param numConsumers      the number of consumers to create
+     * @param <T>               the type of the notifications
+     *
+     * @return the list of created consumers
      */
-    protected final boolean isEmbedded() {
-        return embedded;
-    }
-
-    public abstract List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers);
+    <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers);
 
-    public abstract void send(NotificationType type, String... messages) throws NotificationException;
+    void send(NotificationType type, String... messages) throws NotificationException;
 
-    public abstract void close();
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
new file mode 100644
index 0000000..32f325a
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+
+import java.util.List;
+
+/**
+ * Notification of entity changes.
+ */
+public interface EntityNotification {
+
+    /**
+     * Operations that result in an entity notification.
+     */
+    enum OperationType {
+        ENTITY_CREATE,
+        ENTITY_UPDATE,
+        TRAIT_ADD,
+        TRAIT_DELETE
+    }
+
+
+    // ----- EntityNotification ------------------------------------------------
+
+    /**
+     * Get the entity that is associated with this notification.
+     *
+     * @return the associated entity
+     */
+    IReferenceableInstance getEntity();
+
+    /**
+     * Get flattened list of traits that are associated with this entity (includes super traits).
+     *
+     * @return the list of all traits
+     */
+    List<IStruct> getAllTraits();
+
+    /**
+     * Get the type of operation that triggered this notification.
+     *
+     * @return the operation type
+     */
+    OperationType getOperationType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
new file mode 100644
index 0000000..9f8ce45
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.types.FieldMapping;
+import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Entity notification implementation.
+ */
+public class EntityNotificationImpl implements EntityNotification {
+
+    private final Referenceable entity;
+    private final OperationType operationType;
+    private final List<IStruct> traits;
+
+
+    // ----- Constructors ------------------------------------------------------
+
+    /**
+     * No-arg constructor for serialization.
+     */
+    @SuppressWarnings("unused")
+    private EntityNotificationImpl() throws AtlasException {
+        this(null, OperationType.ENTITY_CREATE, Collections.<IStruct>emptyList());
+    }
+
+    /**
+     * Construct an EntityNotification.
+     *
+     * @param entity            the entity subject of the notification
+     * @param operationType     the type of operation that caused the notification
+     * @param traits            the traits for the given entity
+     *
+     * @throws AtlasException if the entity notification can not be created
+     */
+    public EntityNotificationImpl(Referenceable entity, OperationType operationType, List<IStruct> traits)
+        throws AtlasException {
+        this.entity = entity;
+        this.operationType = operationType;
+        this.traits = traits;
+    }
+
+    /**
+     * Construct an EntityNotification.
+     *
+     * @param entity         the entity subject of the notification
+     * @param operationType  the type of operation that caused the notification
+     * @param typeSystem     the Atlas type system
+     *
+     * @throws AtlasException if the entity notification can not be created
+     */
+    public EntityNotificationImpl(Referenceable entity, OperationType operationType, TypeSystem typeSystem)
+        throws AtlasException {
+        this(entity, operationType, getAllTraits(entity, typeSystem));
+    }
+
+
+    // ----- EntityNotification ------------------------------------------------
+
+    @Override
+    public IReferenceableInstance getEntity() {
+        return entity;
+    }
+
+    @Override
+    public List<IStruct> getAllTraits() {
+        return traits;
+    }
+
+    @Override
+    public OperationType getOperationType() {
+        return operationType;
+    }
+
+
+    // ----- Object overrides --------------------------------------------------
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        EntityNotificationImpl that = (EntityNotificationImpl) o;
+
+        return !(entity != null ? !entity.equals(that.entity) : that.entity != null) &&
+            operationType == that.operationType &&
+            traits.equals(that.traits);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = entity != null ? entity.hashCode() : 0;
+        result = 31 * result + operationType.hashCode();
+        result = 31 * result + traits.hashCode();
+        return result;
+    }
+
+
+    // ----- helper methods ----------------------------------------------------
+
+    private static List<IStruct> getAllTraits(IReferenceableInstance entityDefinition,
+                                              TypeSystem typeSystem) throws AtlasException {
+        List<IStruct> traitInfo = new LinkedList<>();
+        for (String traitName : entityDefinition.getTraits()) {
+            IStruct trait = entityDefinition.getTrait(traitName);
+            String typeName = trait.getTypeName();
+            Map<String, Object> valuesMap = trait.getValuesMap();
+            traitInfo.add(new Struct(typeName, valuesMap));
+            traitInfo.addAll(getSuperTraits(typeName, valuesMap, typeSystem));
+        }
+        return traitInfo;
+    }
+
+    private static List<IStruct> getSuperTraits(
+        String typeName, Map<String, Object> values, TypeSystem typeSystem) throws AtlasException {
+
+        List<IStruct> superTypes = new LinkedList<>();
+
+        TraitType traitDef = typeSystem.getDataType(TraitType.class, typeName);
+        Set<String> superTypeNames = traitDef.getAllSuperTypeNames();
+
+        for (String superTypeName : superTypeNames) {
+            TraitType superTraitDef = typeSystem.getDataType(TraitType.class, superTypeName);
+
+            Map<String, Object> superTypeValues = new HashMap<>();
+
+            FieldMapping fieldMapping = superTraitDef.fieldMapping();
+
+            if (fieldMapping != null) {
+                Set<String> superTypeAttributeNames = fieldMapping.fields.keySet();
+
+                for (String superTypeAttributeName : superTypeAttributeNames) {
+                    if (values.containsKey(superTypeAttributeName)) {
+                        superTypeValues.put(superTypeAttributeName, values.get(superTypeAttributeName));
+                    }
+                }
+            }
+            IStruct superTrait = new Struct(superTypeName, superTypeValues);
+            superTypes.add(superTrait);
+            superTypes.addAll(getSuperTraits(superTypeName, values, typeSystem));
+        }
+
+        return superTypes;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
new file mode 100644
index 0000000..a660bf9
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import com.google.gson.Gson;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.types.TypeSystem;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Listen to the repository for entity changes and produce entity change notifications.
+ */
+public class NotificationEntityChangeListener implements EntityChangeListener {
+
+    private final NotificationInterface notificationInterface;
+    private final TypeSystem typeSystem;
+    private final Gson gson = new Gson();
+
+
+    // ----- Constructors ------------------------------------------------------
+
+    /**
+     * Construct a NotificationEntityChangeListener.
+     *
+     * @param notificationInterface the notification framework interface
+     * @param typeSystem the Atlas type system
+     */
+    public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
+        this.notificationInterface = notificationInterface;
+        this.typeSystem = typeSystem;
+    }
+
+
+    // ----- EntityChangeListener ----------------------------------------------
+
+    @Override
+    public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+        notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE);
+    }
+
+    @Override
+    public void onEntityUpdated(ITypedReferenceableInstance entity) throws AtlasException {
+        notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.ENTITY_UPDATE);
+    }
+
+    @Override
+    public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
+        notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD);
+    }
+
+    @Override
+    public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
+        notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE);
+    }
+
+
+    // ----- helper methods ----------------------------------------------------
+
+    // send notification of entity change
+    private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
+                                     EntityNotification.OperationType operationType) throws AtlasException {
+        List<String> messages = new LinkedList<>();
+
+        for (ITypedReferenceableInstance entityDefinition : entityDefinitions) {
+            Referenceable entity = new Referenceable(entityDefinition);
+
+            EntityNotificationImpl notification =
+                    new EntityNotificationImpl(entity, operationType, typeSystem);
+
+            messages.add(gson.toJson(notification));
+        }
+
+        notificationInterface.send(NotificationInterface.NotificationType.ENTITIES,
+                messages.toArray(new String[messages.size()]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 735655c..625a0b0 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -18,17 +18,37 @@
 package org.apache.atlas.kafka;
 
 import com.google.inject.Inject;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
-import org.testng.Assert;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.codehaus.jettison.json.JSONArray;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 @Guice(modules = NotificationModule.class)
 public class KafkaNotificationTest {
 
@@ -41,15 +61,62 @@ public class KafkaNotificationTest {
     }
 
     @Test
-    public void testSendReceiveMessage() throws AtlasException {
-        String msg1 = "message" + random();
-        String msg2 = "message" + random();
+    public void testSendReceiveMessage() throws Exception {
+        String msg1 = "[{\"message\": " + 123 + "}]";
+        String msg2 = "[{\"message\": " + 456 + "}]";
         kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
-        NotificationConsumer consumer = kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
-        Assert.assertTrue(consumer.hasNext());
-        Assert.assertEquals(msg1, consumer.next());
-        Assert.assertTrue(consumer.hasNext());
-        Assert.assertEquals(msg2, consumer.next());
+        List<NotificationConsumer<JSONArray>> consumers =
+                kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+        NotificationConsumer<JSONArray> consumer = consumers.get(0);
+        assertTrue(consumer.hasNext());
+        assertEquals(new JSONArray(msg1), consumer.next());
+        assertTrue(consumer.hasNext());
+        assertEquals(new JSONArray(msg2), consumer.next());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCreateConsumers() throws Exception {
+        Configuration configuration = mock(Configuration.class);
+        Iterator iterator = mock(Iterator.class);
+        ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
+        KafkaStream kafkaStream1 = mock(KafkaStream.class);
+        KafkaStream kafkaStream2 = mock(KafkaStream.class);
+        String groupId = "groupId9999";
+
+        when(configuration.subset(KafkaNotification.PROPERTY_PREFIX)).thenReturn(configuration);
+        when(configuration.getKeys()).thenReturn(iterator);
+        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
+        when(iterator.next()).thenReturn("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY);
+        when(configuration.getList("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY))
+                .thenReturn(Collections.<Object>singletonList(groupId));
+
+        Map<String, List<KafkaStream<String, String>>> streamsMap = new HashMap<>();
+        List<KafkaStream<String, String>> kafkaStreamList = new LinkedList<>();
+        kafkaStreamList.add(kafkaStream1);
+        kafkaStreamList.add(kafkaStream2);
+        streamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreamList);
+        Map<String, Integer> topicCountMap = new HashMap<>();
+        topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 2);
+
+        when(consumerConnector.createMessageStreams(
+            eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(streamsMap);
+
+        TestKafkaNotification kafkaNotification = new TestKafkaNotification(configuration, consumerConnector);
+
+        List<NotificationConsumer<String>> consumers =
+            kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
+
+        assertEquals(2, consumers.size());
+
+        // assert that all of the given kafka streams were used to create kafka consumers
+        List<KafkaStream> streams = kafkaNotification.kafkaStreams;
+        assertTrue(streams.contains(kafkaStream1));
+        assertTrue(streams.contains(kafkaStream2));
+
+        // assert that the given consumer group id was added to the properties used to create the consumer connector
+        Properties properties = kafkaNotification.consumerProperties;
+        assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
     }
 
     private String random() {
@@ -60,4 +127,33 @@ public class KafkaNotificationTest {
     public void teardown() throws Exception {
         kafka.stop();
     }
+
+    // Extended kafka notification class for testing.
+    private static class TestKafkaNotification extends KafkaNotification {
+
+        private final ConsumerConnector consumerConnector;
+
+        private Properties consumerProperties;
+        private List<KafkaStream> kafkaStreams = new LinkedList<>();
+
+        public TestKafkaNotification(Configuration applicationProperties,
+                                     ConsumerConnector consumerConnector) throws AtlasException {
+            super(applicationProperties);
+            this.consumerConnector = consumerConnector;
+        }
+
+        @Override
+        protected ConsumerConnector createConsumerConnector(Properties properties) {
+            this.consumerProperties = properties;
+            kafkaStreams.clear();
+            return consumerConnector;
+        }
+
+        @Override
+        protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
+                                                                                  int consumerId) {
+            kafkaStreams.add(stream);
+            return super.createKafkaConsumer(type, stream, consumerId);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java
new file mode 100644
index 0000000..aff9d04
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * EntityNotificationImpl tests.
+ */
+public class EntityNotificationImplTest {
+
+  @Test
+  public void testGetEntity() throws Exception {
+    Referenceable entity = getEntity("id");
+
+    EntityNotificationImpl entityNotification =
+        new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
+            Collections.<IStruct>emptyList());
+
+    assertEquals(entity, entityNotification.getEntity());
+  }
+
+  @Test
+  public void testGetOperationType() throws Exception {
+    Referenceable entity = getEntity("id");
+
+    EntityNotificationImpl entityNotification =
+        new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
+            Collections.<IStruct>emptyList());
+
+    assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
+  }
+
+  @Test
+  public void testGetAllTraits() throws Exception {
+    Referenceable entity = getEntity("id");
+    String traitName = "MyTrait";
+    List<IStruct> traitInfo = new LinkedList<>();
+    IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
+    traitInfo.add(trait);
+
+    EntityNotificationImpl entityNotification =
+        new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
+
+    assertEquals(traitInfo, entityNotification.getAllTraits());
+  }
+
+  @Test
+  public void testGetAllTraits_superTraits() throws Exception {
+
+    TypeSystem typeSystem = mock(TypeSystem.class);
+
+    String traitName = "MyTrait";
+    IStruct myTrait = new Struct(traitName);
+
+    String superTraitName = "MySuperTrait";
+
+    TraitType traitDef = mock(TraitType.class);
+    Set<String> superTypeNames = Collections.singleton(superTraitName);
+
+    TraitType superTraitDef = mock(TraitType.class);
+    Set<String> superSuperTypeNames = Collections.emptySet();
+
+    Referenceable entity = getEntity("id", myTrait);
+
+    when(typeSystem.getDataType(TraitType.class, traitName)).thenReturn(traitDef);
+    when(typeSystem.getDataType(TraitType.class, superTraitName)).thenReturn(superTraitDef);
+
+    when(traitDef.getAllSuperTypeNames()).thenReturn(superTypeNames);
+    when(superTraitDef.getAllSuperTypeNames()).thenReturn(superSuperTypeNames);
+
+    EntityNotificationImpl entityNotification =
+        new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, typeSystem);
+
+    List<IStruct> allTraits = entityNotification.getAllTraits();
+
+    assertEquals(2, allTraits.size());
+
+    for (IStruct trait : allTraits) {
+      String typeName = trait.getTypeName();
+      assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName));
+    }
+  }
+
+  @Test
+  public void testEquals() throws Exception {
+    Referenceable entity = getEntity("id");
+
+    EntityNotificationImpl entityNotification2 =
+        new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
+            Collections.<IStruct>emptyList());
+
+    EntityNotificationImpl entityNotification =
+        new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
+            Collections.<IStruct>emptyList());
+
+    assertTrue(entityNotification.equals(entityNotification2));
+    assertTrue(entityNotification2.equals(entityNotification));
+  }
+
+  private Referenceable getEntity(String id, IStruct ... traits) {
+    String typeName = "typeName";
+    Map<String, Object> values = new HashMap<>();
+
+    List<String> traitNames = new LinkedList<>();
+    Map<String, IStruct> traitMap = new HashMap<>();
+
+    for (IStruct trait : traits) {
+      String traitName = trait.getTypeName();
+
+      traitNames.add(traitName);
+      traitMap.put(traitName, trait);
+    }
+    return new Referenceable(id, typeName, values, traitNames, traitMap);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index bfc1b43..c70df12 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)
 ATALS-238 atlas_start.py- the Atlas server won’t restart after improper shutdown(ndjouri via sumasai)
 ATLAS-293 UI Requires Internet Access For UI Facelift (darshankumar89 via shwethags)
 ATLAS-292 The artifactId 'dashboard' should be 'atlas-dashboard' in the webapp/pom.xml (ltfxyz via shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index c332ea4..3e2a6d1 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -41,6 +41,11 @@
 
         <dependency>
             <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-client</artifactId>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
deleted file mode 100755
index 6465e92..0000000
--- a/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.listener;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.ITypedReferenceableInstance;
-
-import java.util.Collection;
-
-/**
- * Entity (a Typed instance) change notification listener.
- */
-public interface EntityChangeListener {
-
-    /**
-     * This is upon adding a new typed instance to the repository.
-     *
-     * @param typedInstances a typed instance
-     * @throws AtlasException
-     */
-    void onEntityAdded(Collection<ITypedReferenceableInstance> typedInstances) throws AtlasException;
-
-    /**
-     * This is upon adding a new trait to a typed instance.
-     *
-     * @param guid          globally unique identifier for the entity
-     * @param traitName     trait name for the instance that needs to be added to entity
-     * @throws AtlasException
-     */
-    void onTraitAdded(String guid, String traitName) throws AtlasException;
-
-    /**
-     * This is upon deleting a trait from a typed instance.
-     *
-     * @param guid          globally unique identifier for the entity
-     * @param traitName     trait name for the instance that needs to be deleted from entity
-     * @throws AtlasException
-     */
-    void onTraitDeleted(String guid, String traitName) throws AtlasException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java b/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java
deleted file mode 100755
index dee396a..0000000
--- a/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.listener;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.types.IDataType;
-
-import java.util.Collection;
-
-/**
- * Types change notification listener.
- */
-public interface TypesChangeListener {
-
-    /**
-     * This is upon adding new type(s) to Store.
-     *
-     * @param dataTypes data type
-     * @throws AtlasException
-     */
-    void onAdd(Collection<? extends IDataType> dataTypes) throws AtlasException;
-
-    /**
-     * This is upon removing an existing type from the Store.
-     *
-     * @param typeName type name
-     * @throws AtlasException
-     */
-    // void onRemove(String typeName) throws MetadataException;
-
-    // This is upon updating an existing type to the store
-    // void onChange() throws MetadataException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index fbfa731..7dfe165 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -31,6 +31,7 @@ import org.apache.atlas.listener.TypesChangeListener;
 import org.apache.atlas.repository.IndexCreationException;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.typestore.ITypeStore;
+import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.typesystem.ITypedStruct;
 import org.apache.atlas.typesystem.Referenceable;
@@ -63,9 +64,11 @@ import javax.inject.Inject;
 import javax.inject.Singleton;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Simple wrapper over TypeSystem and MetadataRepository services with hooks
@@ -247,7 +250,14 @@ public class DefaultMetadataService implements MetadataService {
 
         final String[] guids = repository.createEntities(typedInstances);
 
-        onEntityAddedToRepo(Arrays.asList(typedInstances));
+        Set<ITypedReferenceableInstance> entitites = new HashSet<>();
+
+        for (String guid : guids) {
+            entitites.add(repository.getEntityDefinition(guid));
+        }
+
+        onEntitiesAddedToRepo(entitites);
+
         return new JSONArray(Arrays.asList(guids)).toString();
     }
 
@@ -300,8 +310,8 @@ public class DefaultMetadataService implements MetadataService {
 
     /**
      * Validate that attribute is unique attribute
-     * @param entityType
-     * @param attributeName
+     * @param entityType     the entity type
+     * @param attributeName  the name of the attribute
      */
     private void validateUniqueAttribute(String entityType, String attributeName) throws AtlasException {
         ClassType type = typeSystem.getDataType(ClassType.class, entityType);
@@ -332,6 +342,8 @@ public class DefaultMetadataService implements MetadataService {
         ParamChecker.notEmpty(value, "property value cannot be null");
 
         repository.updateEntity(guid, property, value);
+
+        onEntityUpdated(repository.getEntityDefinition(guid), property, value);
     }
 
     private void validateTypeExists(String entityType) throws AtlasException {
@@ -380,12 +392,12 @@ public class DefaultMetadataService implements MetadataService {
 
         // ensure trait is not already defined
         Preconditions
-                .checkArgument(!getTraitNames(guid).contains(traitName), "trait=%s is already defined for entity=%s",
-                        traitName, guid);
+            .checkArgument(!getTraitNames(guid).contains(traitName), "trait=%s is already defined for entity=%s",
+                traitName, guid);
 
         repository.addTrait(guid, traitInstance);
 
-        onTraitAddedToEntity(guid, traitName);
+        onTraitAddedToEntity(repository.getEntityDefinition(guid), traitInstance);
     }
 
     private ITypedStruct deserializeTraitInstance(String traitInstanceDefinition)
@@ -427,7 +439,7 @@ public class DefaultMetadataService implements MetadataService {
 
         repository.deleteTrait(guid, traitNameToBeDeleted);
 
-        onTraitDeletedFromEntity(guid, traitNameToBeDeleted);
+        onTraitDeletedFromEntity(repository.getEntityDefinition(guid), traitNameToBeDeleted);
     }
 
     private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
@@ -447,23 +459,29 @@ public class DefaultMetadataService implements MetadataService {
         }
     }
 
-    private void onEntityAddedToRepo(Collection<ITypedReferenceableInstance> typedInstances)
-    throws AtlasException {
+    private void onEntitiesAddedToRepo(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+
+        for (EntityChangeListener listener : entityChangeListeners) {
+            listener.onEntitiesAdded(entities);
+        }
+    }
 
+    private void onEntityUpdated(ITypedReferenceableInstance entity, String property, String value)
+        throws AtlasException {
         for (EntityChangeListener listener : entityChangeListeners) {
-            listener.onEntityAdded(typedInstances);
+            listener.onEntityUpdated(entity);
         }
     }
 
-    private void onTraitAddedToEntity(String typeName, String traitName) throws AtlasException {
+    private void onTraitAddedToEntity(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
         for (EntityChangeListener listener : entityChangeListeners) {
-            listener.onTraitAdded(typeName, traitName);
+            listener.onTraitAdded(entity, trait);
         }
     }
 
-    private void onTraitDeletedFromEntity(String typeName, String traitName) throws AtlasException {
+    private void onTraitDeletedFromEntity(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
         for (EntityChangeListener listener : entityChangeListeners) {
-            listener.onTraitDeleted(typeName, traitName);
+            listener.onTraitDeleted(entity, traitName);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/repository/src/main/java/org/apache/atlas/services/MetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/MetadataService.java b/repository/src/main/java/org/apache/atlas/services/MetadataService.java
index c806d3a..d048cc6 100755
--- a/repository/src/main/java/org/apache/atlas/services/MetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetadataService.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.services;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.typesystem.types.DataTypes;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -131,4 +132,18 @@ public interface MetadataService {
      * @throws AtlasException
      */
     void deleteTrait(String guid, String traitNameToBeDeleted) throws AtlasException;
+
+    /**
+     * Register a listener for entity change.
+     *
+     * @param listener  the listener to register
+     */
+    void registerListener(EntityChangeListener listener);
+
+    /**
+     * Unregister an entity change listener.
+     *
+     * @param listener  the listener to unregister
+     */
+    void unregisterListener(EntityChangeListener listener);
 }