You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/12/14 17:45:10 UTC

incubator-atlas git commit: ATLAS-386 Handle hive rename Table (shwethags)

Repository: incubator-atlas
Updated Branches:
  refs/heads/branch-0.6-incubating dc2ca4520 -> dad90970d


ATLAS-386 Handle hive rename Table (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/dad90970
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/dad90970
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/dad90970

Branch: refs/heads/branch-0.6-incubating
Commit: dad90970da80038757619bd752e72a16e27d36c2
Parents: dc2ca45
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Mon Dec 14 22:15:01 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Mon Dec 14 22:15:01 2015 +0530

----------------------------------------------------------------------
 addons/hive-bridge/pom.xml                      |   4 +
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |   4 +
 .../org/apache/atlas/hive/hook/HiveHook.java    | 112 ++++++------
 .../org/apache/atlas/hive/hook/HiveHookIT.java  |  15 +-
 .../main/java/org/apache/atlas/AtlasClient.java |  13 +-
 docs/src/site/twiki/Bridge-Hive.twiki           |   2 +-
 .../apache/atlas/kafka/KafkaNotification.java   |   2 +-
 .../notification/AbstractNotification.java      |  19 ++
 .../AbstractNotificationConsumer.java           |  42 ++++-
 .../notification/NotificationHookConsumer.java  |  49 ++++--
 .../notification/NotificationInterface.java     |   8 +-
 .../NotificationEntityChangeListener.java       |  32 +---
 .../notification/hook/HookNotification.java     | 174 +++++++++++++++++++
 .../atlas/kafka/KafkaNotificationTest.java      |  20 ---
 .../notification/hook/HookNotificationTest.java |  68 ++++++++
 pom.xml                                         |   9 +-
 release-log.txt                                 |   1 +
 .../atlas/services/DefaultMetadataService.java  |   7 +
 .../test/java/org/apache/atlas/TestUtils.java   |   4 +-
 .../graph/TitanGraphProviderTest.java           |   3 -
 .../service/DefaultMetadataServiceTest.java     |  46 +++--
 .../apache/atlas/typesystem/Referenceable.java  |   6 +-
 .../atlas/typesystem/types/ClassType.java       |   6 +-
 .../atlas/typesystem/types/Multiplicity.java    |   4 +-
 .../src/main/resources/application.properties   |   6 +-
 webapp/pom.xml                                  |   4 +
 .../src/main/java/org/apache/atlas/Atlas.java   |   4 +
 .../NotificationHookConsumerIT.java             | 111 ++++++++----
 .../web/resources/EntityJerseyResourceIT.java   |  31 ++++
 29 files changed, 602 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index db842d7..4b0ac0f 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -290,6 +290,10 @@
                             <name>atlas.log.dir</name>
                             <value>${project.build.directory}/logs</value>
                         </systemProperty>
+                        <systemProperty>
+                            <name>atlas.data</name>
+                            <value>${project.build.directory}/data</value>
+                        </systemProperty>
                     </systemProperties>
                     <stopKey>atlas-stop</stopKey>
                     <stopPort>31001</stopPort>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index f367317..ee5ae10 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -70,6 +70,10 @@ public class HiveMetaStoreBridge {
         this(hiveConf, atlasConf, null, null);
     }
 
+    public String getClusterName() {
+        return clusterName;
+    }
+
     /**
      * Construct a HiveMetaStoreBridge.
      * @param hiveConf hive conf

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 2f88446..37a3169 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -25,11 +25,12 @@ import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -47,16 +48,12 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.codehaus.jettison.json.JSONArray;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -109,6 +106,8 @@ public class HiveHook implements ExecuteWithHookContext {
     @Inject
     private static NotificationInterface notifInterface;
 
+    private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
+
     private static final HiveConf hiveConf;
 
     static {
@@ -233,37 +232,51 @@ public class HiveHook implements ExecuteWithHookContext {
 
         default:
         }
+
+        notifyAtlas();
     }
 
-    //todo re-write with notification
     private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
         //crappy, no easy of getting new name
         assert event.inputs != null && event.inputs.size() == 1;
         assert event.outputs != null && event.outputs.size() > 0;
 
-        Table oldTable = event.inputs.iterator().next().getTable();
-        Table newTable = null;
+        //Update entity if not exists
+        ReadEntity oldEntity = event.inputs.iterator().next();
+        Table oldTable = oldEntity.getTable();
+
         for (WriteEntity writeEntity : event.outputs) {
             if (writeEntity.getType() == Entity.Type.TABLE) {
-                Table table = writeEntity.getTable();
-                if (table.getDbName().equals(oldTable.getDbName()) && !table.getTableName()
+                Table newTable = writeEntity.getTable();
+                if (newTable.getDbName().equals(oldTable.getDbName()) && !newTable.getTableName()
                         .equals(oldTable.getTableName())) {
-                    newTable = table;
-                    break;
+
+                    //Create/update old table entity - create new entity and replace id
+                    Referenceable tableEntity = createEntities(dgiBridge, writeEntity);
+                    String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
+                            oldTable.getDbName(), oldTable.getTableName());
+                    tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
+                    tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase());
+
+
+                    String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
+                            newTable.getDbName(), newTable.getTableName());
+
+                    Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
+                    newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
+                    newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
+                    messages.add(new HookNotification.EntityPartialUpdateRequest(HiveDataTypes.HIVE_TABLE.getName(),
+                            HiveDataModelGenerator.NAME, oldQualifiedName, newEntity));
                 }
             }
         }
-        if (newTable == null) {
-            LOG.warn("Failed to deduct new name for " + event.queryStr);
-            return;
-        }
     }
 
-    private Map<Type, Referenceable> createEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
-        Map<Type, Referenceable> entities = new LinkedHashMap<>();
+    private Referenceable createEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
         Database db = null;
         Table table = null;
         Partition partition = null;
+        List<Referenceable> entities = new ArrayList<>();
 
         switch (entity.getType()) {
             case DATABASE:
@@ -283,64 +296,54 @@ public class HiveHook implements ExecuteWithHookContext {
         }
 
         db = dgiBridge.hiveClient.getDatabase(db.getName());
-        Referenceable dbReferenceable = dgiBridge.createDBInstance(db);
-        entities.put(Type.DATABASE, dbReferenceable);
+        Referenceable dbEntity = dgiBridge.createDBInstance(db);
+        entities.add(dbEntity);
 
-        Referenceable tableReferenceable = null;
+        Referenceable tableEntity = null;
         if (table != null) {
             table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
-            tableReferenceable = dgiBridge.createTableInstance(dbReferenceable, table);
-            entities.put(Type.TABLE, tableReferenceable);
+            tableEntity = dgiBridge.createTableInstance(dbEntity, table);
+            entities.add(tableEntity);
         }
 
         if (partition != null) {
-            Referenceable partitionReferenceable = dgiBridge.createPartitionReferenceable(tableReferenceable,
-                    (Referenceable) tableReferenceable.get("sd"), partition);
-            entities.put(Type.PARTITION, partitionReferenceable);
+            Referenceable partitionEntity = dgiBridge.createPartitionReferenceable(tableEntity,
+                    (Referenceable) tableEntity.get("sd"), partition);
+            entities.add(partitionEntity);
         }
-        return entities;
+
+        messages.add(new HookNotification.EntityUpdateRequest(entities));
+        return tableEntity;
     }
 
     private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
-        List<Referenceable> entities = new ArrayList<>();
         for (WriteEntity entity : event.outputs) {
             if (entity.getType() == entityType) {
-                entities.addAll(createEntities(dgiBridge, entity).values());
+                createEntities(dgiBridge, entity);
             }
         }
-        notifyEntity(entities);
-    }
-
-    private void notifyEntity(Collection<Referenceable> entities) {
-        JSONArray entitiesArray = new JSONArray();
-        for (Referenceable entity : entities) {
-            String entityJson = InstanceSerialization.toJson(entity, true);
-            entitiesArray.put(entityJson);
-        }
-        notifyEntity(entitiesArray);
     }
 
     /**
      * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
      * De-duping of entities is done on server side depending on the unique attribute on the
-     * @param entities
      */
-    private void notifyEntity(JSONArray entities) {
+    private void notifyAtlas() {
         int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-        String message = entities.toString();
 
+        LOG.debug("Notifying atlas with messages {}", messages);
         int numRetries = 0;
         while (true) {
             try {
-                notifInterface.send(NotificationInterface.NotificationType.HOOK, message);
-                return;
+                notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+                break;
             } catch(Exception e) {
                 numRetries++;
                 if(numRetries < maxRetries) {
-                    LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
+                    LOG.debug("Failed to notify atlas. Retrying", e);
                 } else {
-                    LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message,
-                            maxRetries, e);
+                    LOG.error("Failed to notify atlas after {} retries. Quitting", maxRetries, e);
+                    break;
                 }
             }
         }
@@ -369,7 +372,7 @@ public class HiveHook implements ExecuteWithHookContext {
         String queryStr = normalize(event.queryStr);
 
         LOG.debug("Registering query: {}", queryStr);
-        List<Referenceable> entities = new ArrayList<>();
+
         Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
         processReferenceable.set("name", queryStr);
         processReferenceable.set("operationType", event.operation.getOperationName());
@@ -379,9 +382,8 @@ public class HiveHook implements ExecuteWithHookContext {
         List<Referenceable> source = new ArrayList<>();
         for (ReadEntity readEntity : inputs) {
             if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
-                Map<Type, Referenceable> localEntities = createEntities(dgiBridge, readEntity);
-                source.add(localEntities.get(Type.TABLE));
-                entities.addAll(localEntities.values());
+                Referenceable inTable = createEntities(dgiBridge, readEntity);
+                source.add(inTable);
             }
         }
         processReferenceable.set("inputs", source);
@@ -389,9 +391,8 @@ public class HiveHook implements ExecuteWithHookContext {
         List<Referenceable> target = new ArrayList<>();
         for (WriteEntity writeEntity : outputs) {
             if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
-                Map<Type, Referenceable> localEntities = createEntities(dgiBridge, writeEntity);
-                target.add(localEntities.get(Type.TABLE));
-                entities.addAll(localEntities.values());
+                Referenceable outTable = createEntities(dgiBridge, writeEntity);
+                target.add(outTable);
             }
         }
         processReferenceable.set("outputs", target);
@@ -402,8 +403,7 @@ public class HiveHook implements ExecuteWithHookContext {
 
         //TODO set
         processReferenceable.set("queryGraph", "queryGraph");
-        entities.add(processReferenceable);
-        notifyEntity(entities);
+        messages.add(new HookNotification.EntityCreateRequest(processReferenceable));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 5447de5..1c3d9a4 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -149,6 +149,17 @@ public class HiveHookIT {
         assertDatabaseIsRegistered(DEFAULT_DB);
     }
 
+    @Test
+    public void testRenameTable() throws Exception {
+        String tableName = createTable();
+        String newTableName = tableName();
+        runCommand(String.format("alter table %s rename to %s", tableName, newTableName));
+
+        assertTableIsRegistered(DEFAULT_DB, newTableName);
+        assertTableIsNotRegistered(DEFAULT_DB, tableName);
+    }
+
+
     private String assertColumnIsRegistered(String colName) throws Exception {
         LOG.debug("Searching for column {}", colName);
         String query =
@@ -327,8 +338,8 @@ public class HiveHookIT {
 
         LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
         String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', "
-                               + "db where name = '%s' and clusterName = '%s' select p", typeName, value,
-                            tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
+                        + "db where name = '%s' and clusterName = '%s' select p", typeName, value,
+                tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
 
         return assertEntityIsRegistered(dslQuery, "p");
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index b108b25..0a730af 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -44,6 +44,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
@@ -291,12 +292,16 @@ public class AtlasClient {
     }
 
     public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException {
+        return createEntity(Arrays.asList(entities));
+    }
+
+    public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
         JSONArray entityArray = getEntitiesArray(entities);
         return createEntity(entityArray);
     }
 
-    private JSONArray getEntitiesArray(Referenceable[] entities) {
-        JSONArray entityArray = new JSONArray(entities.length);
+    private JSONArray getEntitiesArray(Collection<Referenceable> entities) {
+        JSONArray entityArray = new JSONArray(entities.size());
         for (Referenceable entity : entities) {
             entityArray.put(InstanceSerialization.toJson(entity, true));
         }
@@ -311,6 +316,10 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException {
+        return updateEntities(Arrays.asList(entities));
+    }
+
+    public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
         JSONArray entitiesArray = getEntitiesArray(entities);
         JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
         try {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/docs/src/site/twiki/Bridge-Hive.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Bridge-Hive.twiki b/docs/src/site/twiki/Bridge-Hive.twiki
index c99f046..0c7732b 100644
--- a/docs/src/site/twiki/Bridge-Hive.twiki
+++ b/docs/src/site/twiki/Bridge-Hive.twiki
@@ -29,7 +29,7 @@ hive_process - attribute name - <queryString> - trimmed query string in lower ca
 
 ---++ Importing Hive Metadata
 org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
-Set the following configuration in <atlas-conf>/client.properties and set environment variable HIVE_CONFIG to the hive conf directory:
+Set the following configuration in <atlas-conf>/client.properties and set environment variable $HIVE_CONF_DIR to the hive conf directory:
   <verbatim>
     <property>
       <name>atlas.cluster.name</name>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index bacabeb..37467b3 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -174,7 +174,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
     }
 
     @Override
-    public void send(NotificationType type, String... messages) throws NotificationException {
+    public void _send(NotificationType type, String... messages) throws NotificationException {
         if (producer == null) {
             createProducer();
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index f7bb7b1..72b5a0a 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -20,6 +20,9 @@ package org.apache.atlas.notification;
 import org.apache.atlas.AtlasException;
 import org.apache.commons.configuration.Configuration;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * Abstract notification interface implementation.
  */
@@ -46,4 +49,20 @@ public abstract class AbstractNotification implements NotificationInterface {
     protected final boolean isEmbedded() {
         return embedded;
     }
+
+    @Override
+    public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        String[] strMessages = new String[messages.size()];
+        for (int index = 0; index < messages.size(); index++) {
+            strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index));
+        }
+        _send(type, strMessages);
+    }
+
+    @Override
+    public <T> void send(NotificationType type, T... messages) throws NotificationException {
+        send(type, Arrays.asList(messages));
+    }
+
+    protected abstract void _send(NotificationType type, String[] messages) throws NotificationException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 8c49d4a..42a4e7f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -26,11 +26,16 @@ import com.google.gson.JsonDeserializationContext;
 import com.google.gson.JsonDeserializer;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
 import com.google.gson.reflect.TypeToken;
 import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.notification.entity.EntityNotificationImpl;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.codehaus.jettison.json.JSONArray;
@@ -45,13 +50,15 @@ import java.util.Map;
  */
 public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
 
-    private static final Gson GSON = new GsonBuilder().
+    public static final Gson GSON = new GsonBuilder().
             registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
             registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
             registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
             registerTypeAdapter(IStruct.class, new StructDeserializer()).
-            registerTypeAdapter(IReferenceableInstance.class, new ReferenceableDeserializer()).
-            registerTypeAdapter(JSONArray.class, new JSONArrayDeserializer()).
+            registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
+            registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
+            registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()).
+            registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()).
             create();
 
     private final Class<T> type;
@@ -136,30 +143,44 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
 
     // ----- inner class : StructDeserializer -------------------------------
 
-    public final static class StructDeserializer implements JsonDeserializer<IStruct> {
+    public final static class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
         @Override
         public IStruct deserialize(final JsonElement json, final Type type,
                                               final JsonDeserializationContext context) throws JsonParseException {
             return context.deserialize(json, Struct.class);
         }
+
+        @Override
+        public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
+            String instanceJson = InstanceSerialization.toJson(src, true);
+            return new JsonParser().parse(instanceJson).getAsJsonObject();
+        }
     }
 
 
-    // ----- inner class : ReferenceableDeserializer ------------------------
+    // ----- inner class : ReferenceableSerializerDeserializer ------------------------
 
-    public final static class ReferenceableDeserializer implements JsonDeserializer<IStruct> {
+    public final static class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
+            JsonSerializer<IReferenceableInstance> {
         @Override
         public IReferenceableInstance deserialize(final JsonElement json, final Type type,
                                    final JsonDeserializationContext context) throws JsonParseException {
 
             return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
         }
+
+        @Override
+        public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
+            String instanceJson = InstanceSerialization.toJson(src, true);
+            return new JsonParser().parse(instanceJson).getAsJsonObject();
+        }
     }
 
 
-    // ----- inner class : JSONArrayDeserializer ----------------------------
+    // ----- inner class : JSONArraySerializerDeserializer ----------------------------
 
-    public final static class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
+    public final static class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>,
+            JsonSerializer<JSONArray> {
         @Override
         public JSONArray deserialize(final JsonElement json, final Type type,
                                                   final JsonDeserializationContext context) throws JsonParseException {
@@ -170,5 +191,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
                 throw new JsonParseException(e.getMessage(), e);
             }
         }
+
+        @Override
+        public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
+            return new JsonParser().parse(src.toString()).getAsJsonArray();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1bee26f..6876758 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -23,9 +23,9 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
-import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,12 +57,12 @@ public class NotificationHookConsumer implements Service {
         String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
         atlasClient = new AtlasClient(atlasEndpoint);
         int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer<JSONArray>> consumers =
+        List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
         executors = Executors.newFixedThreadPool(consumers.size());
 
-        for (final NotificationConsumer<JSONArray> consumer : consumers) {
-            executors.submit(new HookConsumer(consumer));
+        for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) {
+            executors.submit(new HookConsumer(atlasClient, consumer));
         }
     }
 
@@ -86,15 +86,12 @@ public class NotificationHookConsumer implements Service {
     }
 
     class HookConsumer implements Runnable {
-        private final NotificationConsumer<JSONArray> consumer;
+        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
         private final AtlasClient client;
 
-        public HookConsumer(NotificationConsumer<JSONArray> consumer) {
-            this(atlasClient, consumer);
-        }
-
-        public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) {
-            this.client = client;
+        public HookConsumer(AtlasClient atlasClient,
+                            NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+            this.client = atlasClient;
             this.consumer = consumer;
         }
 
@@ -106,14 +103,32 @@ public class NotificationHookConsumer implements Service {
             }
 
             while(consumer.hasNext()) {
-                JSONArray entityJson = consumer.next();
-                LOG.info("Processing message {}", entityJson);
+                HookNotification.HookNotificationMessage message = consumer.next();
+
                 try {
-                    JSONArray guids = atlasClient.createEntity(entityJson);
-                    LOG.info("Create entities with guid {}", guids);
+                    switch (message.getType()) {
+                        case ENTITY_CREATE:
+                            HookNotification.EntityCreateRequest createRequest =
+                                    (HookNotification.EntityCreateRequest) message;
+                            atlasClient.createEntity(createRequest.getEntities());
+                            break;
+
+                        case ENTITY_PARTIAL_UPDATE:
+                            HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+                                    (HookNotification.EntityPartialUpdateRequest) message;
+                            atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+                                    partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
+                                    partialUpdateRequest.getEntity());
+                            break;
+
+                        case ENTITY_FULL_UPDATE:
+                            HookNotification.EntityUpdateRequest updateRequest =
+                                    (HookNotification.EntityUpdateRequest) message;
+                            atlasClient.updateEntities(updateRequest.getEntities());
+                            break;
+                    }
                 } catch (Exception e) {
-                    //todo handle failures
-                    LOG.warn("Error handling message {}", entityJson, e);
+                    LOG.debug("Error handling message {}", message, e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 3e68998..e4c4fd6 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -18,7 +18,7 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.notification.entity.EntityNotification;
-import org.codehaus.jettison.json.JSONArray;
+import org.apache.atlas.notification.hook.HookNotification;
 
 import java.util.List;
 
@@ -28,7 +28,7 @@ public interface NotificationInterface {
     String PROPERTY_PREFIX = "atlas.notification";
 
     enum NotificationType {
-        HOOK(JSONArray.class), ENTITIES(EntityNotification.class);
+        HOOK(HookNotification.HookNotificationMessage.class), ENTITIES(EntityNotification.class);
 
         private final Class classType;
 
@@ -52,7 +52,9 @@ public interface NotificationInterface {
      */
     <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers);
 
-    void send(NotificationType type, String... messages) throws NotificationException;
+    <T> void send(NotificationType type, T... messages) throws NotificationException;
+
+    <T> void send(NotificationType type, List<T> messages) throws NotificationException;
 
     void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
index e2d16df..243f93e 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
@@ -17,12 +17,6 @@
 
 package org.apache.atlas.notification.entity;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.notification.NotificationInterface;
@@ -30,10 +24,8 @@ import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.atlas.typesystem.types.TypeSystem;
 
-import java.lang.reflect.Type;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -44,9 +36,6 @@ import java.util.List;
  */
 public class NotificationEntityChangeListener implements EntityChangeListener {
 
-    private static final Gson GSON = new GsonBuilder().
-        registerTypeAdapter(Referenceable.class, new ReferencableSerializer()).create();
-
     private final NotificationInterface notificationInterface;
     private final TypeSystem typeSystem;
 
@@ -93,7 +82,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
     // send notification of entity change
     private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
                                      EntityNotification.OperationType operationType) throws AtlasException {
-        List<String> messages = new LinkedList<>();
+        List<EntityNotification> messages = new LinkedList<>();
 
         for (IReferenceableInstance entityDefinition : entityDefinitions) {
             Referenceable entity = new Referenceable(entityDefinition);
@@ -101,24 +90,9 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
             EntityNotificationImpl notification =
                     new EntityNotificationImpl(entity, operationType, typeSystem);
 
-            messages.add(GSON.toJson(notification));
+            messages.add(notification);
         }
 
-        notificationInterface.send(NotificationInterface.NotificationType.ENTITIES,
-                messages.toArray(new String[messages.size()]));
-    }
-
-
-    // ----- inner class : ReferencableSerializer ---------------------------
-
-    private static class ReferencableSerializer implements JsonSerializer<Referenceable>  {
-
-        public static final JsonParser JSON_PARSER = new JsonParser();
-
-        @Override
-        public JsonElement serialize(Referenceable referenceable, Type type,
-                                     JsonSerializationContext jsonSerializationContext) {
-            return JSON_PARSER.parse(InstanceSerialization.toJson(referenceable, true)).getAsJsonObject();
-        }
+        notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
new file mode 100644
index 0000000..568f58b
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> {
+
+    @Override
+    public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
+                                               JsonDeserializationContext context) throws JsonParseException {
+        if (json.isJsonArray()) {
+            JSONArray jsonArray = context.deserialize(json, JSONArray.class);
+            return new EntityCreateRequest(jsonArray);
+        } else {
+            HookNotificationType type =
+                    context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
+            switch (type) {
+                case ENTITY_CREATE:
+                    return context.deserialize(json, EntityCreateRequest.class);
+
+                case ENTITY_FULL_UPDATE:
+                    return context.deserialize(json, EntityUpdateRequest.class);
+
+                case ENTITY_PARTIAL_UPDATE:
+                    return context.deserialize(json, EntityPartialUpdateRequest.class);
+
+                case TYPE_CREATE:
+                case TYPE_UPDATE:
+                    return context.deserialize(json, TypeRequest.class);
+            }
+            throw new IllegalStateException("Unhandled type " + type);
+        }
+    }
+
+    public enum HookNotificationType {
+        TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE
+    }
+
+    public static class HookNotificationMessage {
+        protected HookNotificationType type;
+
+        private HookNotificationMessage() { }
+
+        public HookNotificationMessage(HookNotificationType type) {
+            this.type = type;
+        }
+
+        public HookNotificationType getType() {
+            return type;
+        }
+    }
+
+    public static class TypeRequest extends HookNotificationMessage {
+        private TypesDef typesDef;
+
+        private TypeRequest() { }
+
+        public TypeRequest(HookNotificationType type, TypesDef typesDef) {
+            super(type);
+            this.typesDef = typesDef;
+        }
+
+        public TypesDef getTypesDef() {
+            return typesDef;
+        }
+    }
+
+    public static class EntityCreateRequest extends HookNotificationMessage {
+        private List<Referenceable> entities;
+
+        private EntityCreateRequest() { }
+
+        public EntityCreateRequest(Referenceable... entities) {
+            super(HookNotificationType.ENTITY_CREATE);
+            this.entities = Arrays.asList(entities);
+        }
+
+        protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) {
+            super(type);
+            this.entities = entities;
+        }
+
+        public EntityCreateRequest(JSONArray jsonArray) {
+            super(HookNotificationType.ENTITY_CREATE);
+            entities = new ArrayList<>();
+            for (int index = 0; index < jsonArray.length(); index++) {
+                try {
+                    entities.add(InstanceSerialization.fromJsonReferenceable(jsonArray.getString(index), true));
+                } catch (JSONException e) {
+                    throw new JsonParseException(e);
+                }
+            }
+        }
+
+        public List<Referenceable> getEntities() throws JSONException {
+            return entities;
+        }
+    }
+
+    public static class EntityUpdateRequest extends EntityCreateRequest {
+        public EntityUpdateRequest(Referenceable... entities) {
+            this(Arrays.asList(entities));
+        }
+
+        public EntityUpdateRequest(List<Referenceable> entities) {
+            super(HookNotificationType.ENTITY_FULL_UPDATE, entities);
+        }
+    }
+
+    public static class EntityPartialUpdateRequest extends HookNotificationMessage {
+        private String typeName;
+        private String attribute;
+        private Referenceable entity;
+        private String attributeValue;
+
+        private EntityPartialUpdateRequest() { }
+
+        public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue,
+                                          Referenceable entity) {
+            super(HookNotificationType.ENTITY_PARTIAL_UPDATE);
+            this.typeName = typeName;
+            this.attribute = attribute;
+            this.attributeValue = attributeValue;
+            this.entity = entity;
+        }
+
+        public String getTypeName() {
+            return typeName;
+        }
+
+        public String getAttribute() {
+            return attribute;
+        }
+
+        public Referenceable getEntity() {
+            return entity;
+        }
+
+        public String getAttributeValue() {
+            return attributeValue;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 625a0b0..eb90f52 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -26,9 +26,7 @@ import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.RandomStringUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.codehaus.jettison.json.JSONArray;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
@@ -61,20 +59,6 @@ public class KafkaNotificationTest {
     }
 
     @Test
-    public void testSendReceiveMessage() throws Exception {
-        String msg1 = "[{\"message\": " + 123 + "}]";
-        String msg2 = "[{\"message\": " + 456 + "}]";
-        kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
-        List<NotificationConsumer<JSONArray>> consumers =
-                kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1);
-        NotificationConsumer<JSONArray> consumer = consumers.get(0);
-        assertTrue(consumer.hasNext());
-        assertEquals(new JSONArray(msg1), consumer.next());
-        assertTrue(consumer.hasNext());
-        assertEquals(new JSONArray(msg2), consumer.next());
-    }
-
-    @Test
     @SuppressWarnings("unchecked")
     public void testCreateConsumers() throws Exception {
         Configuration configuration = mock(Configuration.class);
@@ -119,10 +103,6 @@ public class KafkaNotificationTest {
         assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
     }
 
-    private String random() {
-        return RandomStringUtils.randomAlphanumeric(5);
-    }
-
     @AfterClass
     public void teardown() throws Exception {
         kafka.stop();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
new file mode 100644
index 0000000..4b9f81f
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import org.apache.atlas.notification.AbstractNotificationConsumer;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.codehaus.jettison.json.JSONArray;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class HookNotificationTest {
+
+    @Test
+    public void testMessageBackwardCompatibility() throws Exception {
+        JSONArray jsonArray = new JSONArray();
+        Referenceable entity = new Referenceable("sometype");
+        entity.set("name", "somename");
+        String entityJson = InstanceSerialization.toJson(entity, true);
+        jsonArray.put(entityJson);
+
+        HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson(
+                jsonArray.toString(), HookNotification.HookNotificationMessage.class);
+        assertNotNull(notification);
+        assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+        HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification;
+        assertEquals(createRequest.getEntities().size(), 1);
+        assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName());
+    }
+
+    @Test
+    public void testNewMessageSerDe() throws Exception {
+        Referenceable entity1 = new Referenceable("sometype");
+        entity1.set("attr", "value");
+        entity1.set("complex", new Referenceable("othertype"));
+        Referenceable entity2 = new Referenceable("newtype");
+        HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2);
+
+        String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
+        HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
+                notificationJson, HookNotification.HookNotificationMessage.class);
+        assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+        HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
+        assertEquals(createRequest.getEntities().size(), 2);
+        Referenceable actualEntity1 = createRequest.getEntities().get(0);
+        assertEquals(actualEntity1.getTypeName(), "sometype");
+        assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
+        assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1a324b7..3a368f3 100755
--- a/pom.xml
+++ b/pom.xml
@@ -354,7 +354,8 @@
 
         <!-- skips checkstyle and find bugs -->
         <skipCheck>false</skipCheck>
-        <skipTests>false</skipTests>
+        <skipUTs>false</skipUTs>
+        <skipITs>false</skipITs>
         <skipDocs>true</skipDocs>
         <skipSite>true</skipSite>
         <projectBaseDir>${project.basedir}</projectBaseDir>
@@ -1087,7 +1088,7 @@
             <dependency>
                 <groupId>com.google.code.gson</groupId>
                 <artifactId>gson</artifactId>
-                <version>2.3.1</version>
+                <version>2.5</version>
             </dependency>
 
             <dependency>
@@ -1394,6 +1395,7 @@
                 <configuration>
                     <systemProperties>
                         <user.dir>${project.basedir}</user.dir>
+                        <atlas.data>${project.build.directory}/data</atlas.data>
                     </systemProperties>
                     <!--<skipTests>true</skipTests>-->
                     <forkMode>always</forkMode>
@@ -1403,6 +1405,7 @@
                         -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
                         -Djava.net.preferIPv4Stack=true
                     </argLine>
+                    <skip>${skipUTs}</skip>
                     <excludes>
                         <exclude>**/*Base*</exclude>
                     </excludes>
@@ -1423,12 +1426,14 @@
                 <configuration>
                     <systemPropertyVariables>
                         <projectBaseDir>${projectBaseDir}</projectBaseDir>
+                        <atlas.data>${project.build.directory}/data</atlas.data>
                     </systemPropertyVariables>
                     <redirectTestOutputToFile>true</redirectTestOutputToFile>
                     <argLine>-Djava.awt.headless=true -Dproject.version=${project.version}
                         -Dhadoop.tmp.dir="${project.build.directory}/tmp-hadoop-${user.name}"
                         -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
                     </argLine>
+                    <skip>${skipITs}</skip>
                     <parallel>none</parallel>
                     <reuseForks>false</reuseForks>
                     <forkCount>1</forkCount>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f058148..64dc568 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-386 Handle hive rename Table (shwethags)
 ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemath via sumasai)
 ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown(yhemath via sumasai)
 ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index fb782a2..f605c26 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -304,7 +304,14 @@ public class DefaultMetadataService implements MetadataService {
                 ParamChecker.notEmpty(entityTypeName, "Entity type cannot be null");
 
                 ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName);
+
+                //Both assigned id and values are required for full update
+                //classtype.convert() will remove values if id is assigned. So, set temp id, convert and
+                // then replace with original id
+                Id origId = entityInstance.getId();
+                entityInstance.replaceWithNewId(new Id(entityInstance.getTypeName()));
                 ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED);
+                ((ReferenceableInstance)typedInstrance).replaceWithNewId(origId);
                 instances[index] = typedInstrance;
             }
             return instances;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/TestUtils.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/TestUtils.java b/repository/src/test/java/org/apache/atlas/TestUtils.java
index 12c47d4..2a45bc8 100755
--- a/repository/src/test/java/org/apache/atlas/TestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/TestUtils.java
@@ -251,12 +251,12 @@ public final class TestUtils {
                         new AttributeDefinition("columnsMap",
                                                         DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(),
                                                                 "column_type"),
-                                                        Multiplicity.COLLECTION, true, null),
+                                                        Multiplicity.OPTIONAL, true, null),
                          //map of structs
                         new AttributeDefinition("partitionsMap",
                                                         DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(),
                                                                 "partition_type"),
-                                                        Multiplicity.COLLECTION, true, null),
+                                                        Multiplicity.OPTIONAL, true, null),
                         // struct reference
                         new AttributeDefinition("serde1", "serdeType", Multiplicity.OPTIONAL, false, null),
                         new AttributeDefinition("serde2", "serdeType", Multiplicity.OPTIONAL, false, null),

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
index 6fc7008..d824b50 100644
--- a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
@@ -19,12 +19,9 @@ package org.apache.atlas.repository.graph;
 
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.util.TitanCleanup;
-import com.thinkaurelius.titan.diskstorage.Backend;
-import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.commons.configuration.Configuration;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeTest;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index 0352ef3..0307fd4 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.util.TitanCleanup;
-import org.apache.atlas.typesystem.exception.TypeNotFoundException;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.utils.ParamChecker;
 import org.apache.atlas.RepositoryMetadataModule;
 import org.apache.atlas.TestUtils;
 import org.apache.atlas.repository.graph.GraphProvider;
@@ -32,12 +29,15 @@ import org.apache.atlas.services.MetadataService;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.exception.TypeNotFoundException;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.atlas.typesystem.json.TypesSerialization;
 import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.atlas.typesystem.types.EnumValue;
 import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.typesystem.types.ValueConversionException;
+import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.lang.RandomStringUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.testng.Assert;
@@ -47,11 +47,15 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 @Guice(modules = RepositoryMetadataModule.class)
 public class DefaultMetadataServiceTest {
     @Inject
@@ -296,8 +300,8 @@ public class DefaultMetadataServiceTest {
         Map<String, Object> values = new HashMap<>();
         values.put("name", "col1");
         values.put("type", "type");
-        Referenceable ref = new Referenceable("column_type", values);
-        columns.add(ref);
+        Referenceable col1 = new Referenceable("column_type", values);
+        columns.add(col1);
         Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{
             put("columns", columns);
         }});
@@ -307,19 +311,18 @@ public class DefaultMetadataServiceTest {
             metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, "name", (String) table.get("name"));
         Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
         final List<Referenceable> arrClsColumns = (List) tableDefinition.get("columns");
-        Assert.assertTrue(arrClsColumns.get(0).equalsContents(columns.get(0)));
+        assertReferenceables(arrClsColumns.get(0), columns.get(0));
 
         //Partial update. Add col5 But also update col1
         Map<String, Object> valuesCol5 = new HashMap<>();
         valuesCol5.put("name", "col5");
         valuesCol5.put("type", "type");
-        ref = new Referenceable("column_type", valuesCol5);
+        Referenceable col2 = new Referenceable("column_type", valuesCol5);
         //update col1
-        arrClsColumns.get(0).set("type", "type1");
+        col1.set("type", "type1");
 
         //add col5
-        final List<Referenceable> updateColumns = new ArrayList<>(arrClsColumns);
-        updateColumns.add(ref);
+        final List<Referenceable> updateColumns = Arrays.asList(col1, col2);
 
         tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{
             put("columns", updateColumns);
@@ -331,8 +334,8 @@ public class DefaultMetadataServiceTest {
         tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
         List<Referenceable> arrColumnsList = (List) tableDefinition.get("columns");
         Assert.assertEquals(arrColumnsList.size(), 2);
-        Assert.assertTrue(arrColumnsList.get(0).equalsContents(updateColumns.get(0)));
-        Assert.assertTrue(arrColumnsList.get(1).equalsContents(updateColumns.get(1)));
+        assertReferenceables(arrColumnsList.get(0), updateColumns.get(0));
+        assertReferenceables(arrColumnsList.get(1), updateColumns.get(1));
 
         //Complete update. Add  array elements - col3,4
         Map<String, Object> values1 = new HashMap<>();
@@ -355,9 +358,8 @@ public class DefaultMetadataServiceTest {
         tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
         arrColumnsList = (List) tableDefinition.get("columns");
         Assert.assertEquals(arrColumnsList.size(), columns.size());
-        Assert.assertTrue(arrColumnsList.get(1).equalsContents(columns.get(1)));
-        Assert.assertTrue(arrColumnsList.get(2).equalsContents(columns.get(2)));
-
+        assertReferenceables(arrColumnsList.get(1), columns.get(1));
+        assertReferenceables(arrColumnsList.get(2), columns.get(2));
 
         //Remove a class reference/Id and insert another reference
         //Also covers isComposite case since columns is a composite
@@ -366,8 +368,8 @@ public class DefaultMetadataServiceTest {
 
         values.put("name", "col2");
         values.put("type", "type");
-        ref = new Referenceable("column_type", values);
-        columns.add(ref);
+        col1 = new Referenceable("column_type", values);
+        columns.add(col1);
         table.set("columns", columns);
         updateInstance(table);
 
@@ -376,7 +378,7 @@ public class DefaultMetadataServiceTest {
         tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
         arrColumnsList = (List) tableDefinition.get("columns");
         Assert.assertEquals(arrColumnsList.size(), columns.size());
-        Assert.assertTrue(arrColumnsList.get(0).equalsContents(columns.get(0)));
+        assertReferenceables(arrColumnsList.get(0), columns.get(0));
 
         //Update array column to null
         table.setNull("columns");
@@ -389,6 +391,14 @@ public class DefaultMetadataServiceTest {
         Assert.assertNull(tableDefinition.get("columns"));
     }
 
+    private void assertReferenceables(Referenceable r1, Referenceable r2) {
+        assertEquals(r1.getTypeName(), r2.getTypeName());
+        assertTrue(r1.getTraits().equals(r2.getTraits()));
+        for (String attr : r1.getValuesMap().keySet()) {
+            assertTrue(r1.getValuesMap().get(attr).equals(r2.getValuesMap().get(attr)));
+        }
+        //TODO assert trait instances and complex attributes
+    }
 
     @Test
     public void testStructs() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
index b8dcc7e..aa1736d 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
@@ -33,7 +33,7 @@ import java.util.Map;
  */
 public class Referenceable extends Struct implements IReferenceableInstance {
 
-    private final Id id;
+    private Id id;
     private final ImmutableMap<String, IStruct> traits;
     private final ImmutableList<String> traitNames;
 
@@ -151,6 +151,10 @@ public class Referenceable extends Struct implements IReferenceableInstance {
             '}';
     }
 
+    public void replaceWithNewId(Id id) {
+        this.id = id;
+    }
+
     private static Map<String, IStruct> getTraits(IReferenceableInstance instance) throws AtlasException {
         Map<String, IStruct> traits = new HashMap<>();
         for (String traitName : instance.getTraits() ) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
index ac758fa..adf5f1c 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
@@ -125,9 +125,9 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc
                         r != null ? createInstanceWithTraits(id, r, r.getTraits().toArray(new String[0])) :
                                 createInstance(id);
 
-//                if (id != null && id.isAssigned()) {
-//                    return tr;
-//                }
+                if (id != null && id.isAssigned()) {
+                    return tr;
+                }
 
                 for (Map.Entry<String, AttributeInfo> e : fieldMapping.fields.entrySet()) {
                     String attrKey = e.getKey();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
index 74d7f7c..a54dabc 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
@@ -25,8 +25,8 @@ public final class Multiplicity {
 
     public static final Multiplicity OPTIONAL = new Multiplicity(0, 1, false);
     public static final Multiplicity REQUIRED = new Multiplicity(1, 1, false);
-    public static final Multiplicity COLLECTION = new Multiplicity(0, Integer.MAX_VALUE, false);
-    public static final Multiplicity SET = new Multiplicity(0, Integer.MAX_VALUE, true);
+    public static final Multiplicity COLLECTION = new Multiplicity(1, Integer.MAX_VALUE, false);
+    public static final Multiplicity SET = new Multiplicity(1, Integer.MAX_VALUE, true);
 
     public final int lower;
     public final int upper;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index 4a351e6..d475d7e 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -27,7 +27,7 @@ atlas.graph.storage.backend=${titan.storage.backend}
 atlas.graph.index.search.backend=${titan.index.backend}
 
 #Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
+atlas.graph.storage.directory=${sys:atlas.data}/berkley
 
 #hbase
 #For standalone mode , specify localhost
@@ -38,7 +38,7 @@ atlas.graph.storage.hbase.regions-per-server=1
 atlas.graph.storage.lock.wait-time=10000
 
 #ElasticSearch
-atlas.graph.index.search.directory=target/data/es
+atlas.graph.index.search.directory=${sys:atlas.data}/es
 atlas.graph.index.search.elasticsearch.client-only=false
 atlas.graph.index.search.elasticsearch.local-mode=true
 atlas.graph.index.search.elasticsearch.create.sleep=2000
@@ -63,7 +63,7 @@ atlas.notification.embedded=true
 
 atlas.kafka.zookeeper.connect=localhost:19026
 atlas.kafka.bootstrap.servers=localhost:19027
-atlas.kafka.data=target/data/kafka
+atlas.kafka.data=${sys:atlas.data}/kafka
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=100

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 85a5f94..d893947 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -372,6 +372,10 @@
                             <name>atlas.home</name>
                             <value>${project.build.directory}</value>
                         </systemProperty>
+                        <systemProperty>
+                            <name>atlas.data</name>
+                            <value>${project.build.directory}/data</value>
+                        </systemProperty>
                     </systemProperties>
                     <stopKey>atlas-stop</stopKey>
                     <stopPort>31001</stopPort>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/main/java/org/apache/atlas/Atlas.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/Atlas.java b/webapp/src/main/java/org/apache/atlas/Atlas.java
index 5ae1e44..2d2d619 100755
--- a/webapp/src/main/java/org/apache/atlas/Atlas.java
+++ b/webapp/src/main/java/org/apache/atlas/Atlas.java
@@ -40,6 +40,7 @@ public final class Atlas {
     private static final String APP_PATH = "app";
     private static final String APP_PORT = "port";
     private static final String ATLAS_HOME = "atlas.home";
+    private static final String ATLAS_DATA = "atlas.data";
     private static final String ATLAS_LOG_DIR = "atlas.log.dir";
     public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port";
     public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port";
@@ -110,6 +111,9 @@ public final class Atlas {
         if (System.getProperty(ATLAS_HOME) == null) {
             System.setProperty(ATLAS_HOME, "target");
         }
+        if (System.getProperty(ATLAS_DATA) == null) {
+            System.setProperty(ATLAS_DATA, "target/data");
+        }
         if (System.getProperty(ATLAS_LOG_DIR) == null) {
             System.setProperty(ATLAS_LOG_DIR, "target/logs");
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index e5af26c..3a4661c 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -19,22 +19,22 @@
 package org.apache.atlas.notification;
 
 import com.google.inject.Inject;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.atlas.web.resources.BaseResourceIT;
 import org.codehaus.jettison.json.JSONArray;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+
 @Guice(modules = NotificationModule.class)
 public class NotificationHookConsumerIT extends BaseResourceIT {
 
     @Inject
     private NotificationInterface kafka;
-    private String dbName;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -47,57 +47,106 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         kafka.close();
     }
 
-    private void sendHookMessage(Referenceable entity) throws NotificationException {
-        String entityJson = InstanceSerialization.toJson(entity, true);
-        JSONArray jsonArray = new JSONArray();
-        jsonArray.put(entityJson);
-        kafka.send(NotificationInterface.NotificationType.HOOK, jsonArray.toString());
+    private void sendHookMessage(HookNotification.HookNotificationMessage message) throws NotificationException {
+        kafka.send(NotificationInterface.NotificationType.HOOK, message);
     }
 
     @Test
-    public void testConsumeHookMessage() throws Exception {
-        Referenceable entity = new Referenceable(DATABASE_TYPE);
-        dbName = "db" + randomString();
-        entity.set("name", dbName);
+    public void testCreateEntity() throws Exception {
+        final Referenceable entity = new Referenceable(DATABASE_TYPE);
+        entity.set("name", "db" + randomString());
         entity.set("description", randomString());
 
-        sendHookMessage(entity);
+        sendHookMessage(new HookNotification.EntityCreateRequest(entity));
 
         waitFor(1000, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
-                JSONArray results =
-                        serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+                JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+                        entity.get("name")));
                 return results.length() == 1;
             }
         });
     }
 
-    @Test (dependsOnMethods = "testConsumeHookMessage")
-    public void testEnityDeduping() throws Exception {
-//        Referenceable db = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
-        Referenceable db = new Referenceable(DATABASE_TYPE);
-        db.set("name", dbName);
-        db.set("description", randomString());
+    @Test
+    public void testUpdateEntityPartial() throws Exception {
+        final Referenceable entity = new Referenceable(DATABASE_TYPE);
+        final String dbName = "db" + randomString();
+        entity.set("name", dbName);
+        entity.set("description", randomString());
+        serviceClient.createEntity(entity);
 
-        Referenceable table = new Referenceable(HIVE_TABLE_TYPE);
-        final String tableName = randomString();
-        table.set("name", tableName);
-        table.set("db", db);
+        final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
+        newEntity.set("owner", randomString());
+        sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
+        waitFor(1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
+                return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner")));
+            }
+        });
+
+        //Its partial update and un-set fields are not updated
+        Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
+        assertEquals(actualEntity.get("description"), entity.get("description"));
+    }
+
+    @Test
+    public void testUpdatePartialUpdatingQualifiedName() throws Exception {
+        final Referenceable entity = new Referenceable(DATABASE_TYPE);
+        final String dbName = "db" + randomString();
+        entity.set("name", dbName);
+        entity.set("description", randomString());
+        serviceClient.createEntity(entity);
 
-        sendHookMessage(table);
+        final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
+        final String newName = "db" + randomString();
+        newEntity.set("name", newName);
+
+        sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
         waitFor(1000, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
-                JSONArray results =
-                        serviceClient.searchByDSL(String.format("%s where name='%s'", HIVE_TABLE_TYPE, tableName));
+                JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+                        newName));
                 return results.length() == 1;
             }
         });
 
-        JSONArray results =
-                serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
-        Assert.assertEquals(results.length(), 1);
+        //no entity with the old qualified name
+        JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+        assertEquals(results.length(), 0);
+
     }
 
+    @Test
+    public void testUpdateEntityFullUpdate() throws Exception {
+        Referenceable entity = new Referenceable(DATABASE_TYPE);
+        final String dbName = "db" + randomString();
+        entity.set("name", dbName);
+        entity.set("description", randomString());
+        serviceClient.createEntity(entity);
+
+        final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
+        newEntity.set("name", dbName);
+        newEntity.set("description", randomString());
+        newEntity.set("owner", randomString());
+
+        //updating unique attribute
+        sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
+        waitFor(1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+                        dbName));
+                return results.length() == 1;
+            }
+        });
+
+        Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
+        assertEquals(actualEntity.get("description"), newEntity.get("description"));
+        assertEquals(actualEntity.get("owner"), newEntity.get("owner"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index 7337eaf..f476af3 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -58,6 +58,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.testng.Assert.assertEquals;
+
 /**
  * Integration tests for Entity Jersey Resource.
  */
@@ -95,6 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
     }
 
     @Test
+    //API should accept single entity (or jsonarray of entities)
     public void testSubmitSingleEntity() throws Exception {
         Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
         databaseInstance.set("name", randomString());
@@ -115,6 +118,34 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
     }
 
     @Test
+    public void testEntityDeduping() throws Exception {
+        Referenceable db = new Referenceable(DATABASE_TYPE);
+        String dbName = "db" + randomString();
+        db.set("name", dbName);
+        db.set("description", randomString());
+
+        serviceClient.createEntity(db);
+        JSONArray results =
+                serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+        assertEquals(results.length(), 1);
+
+        //create entity again shouldn't create another instance with same unique attribute value
+        serviceClient.createEntity(db);
+        results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+        assertEquals(results.length(), 1);
+
+        //Test the same across references
+        Referenceable table = new Referenceable(HIVE_TABLE_TYPE);
+        final String tableName = randomString();
+        table.set("name", tableName);
+        table.set("db", db);
+
+        serviceClient.createEntity(table);
+        results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+        assertEquals(results.length(), 1);
+    }
+
+    @Test
     public void testEntityDefinitionAcrossTypeUpdate() throws Exception {
         //create type
         HierarchicalTypeDefinition<ClassType> typeDefinition = TypesUtil