You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/11/23 22:57:31 UTC

[2/2] hive git commit: HIVE-15180: Extend JSONMessageFactory to store additional information about metadata objects on different table events (Sushanth Sowmyan, Vaibhav Gumashta reviewed by Thejas Nair)

HIVE-15180: Extend JSONMessageFactory to store additional information about metadata objects on different table events (Sushanth Sowmyan, Vaibhav Gumashta reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3dd28fbb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3dd28fbb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3dd28fbb

Branch: refs/heads/master
Commit: 3dd28fbbb68f73562b92d49449d3dde0afd79ef1
Parents: bfad863
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Wed Nov 23 14:57:04 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Wed Nov 23 14:57:04 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../listener/DbNotificationListener.java        | 134 +++++-----
 .../api/TestHCatClientNotification.java         |  59 +++--
 .../listener/TestDbNotificationListener.java    | 153 ++++++-----
 .../messaging/AddPartitionMessage.java          |  51 ++++
 .../metastore/messaging/AlterIndexMessage.java  |  27 ++
 .../messaging/AlterPartitionMessage.java        |  40 +++
 .../metastore/messaging/AlterTableMessage.java  |  34 +++
 .../messaging/CreateDatabaseMessage.java        |  28 ++
 .../messaging/CreateFunctionMessage.java        |  27 ++
 .../metastore/messaging/CreateIndexMessage.java |  27 ++
 .../metastore/messaging/CreateTableMessage.java |  40 +++
 .../messaging/DropDatabaseMessage.java          |  27 ++
 .../messaging/DropFunctionMessage.java          |  27 ++
 .../metastore/messaging/DropIndexMessage.java   |  27 ++
 .../messaging/DropPartitionMessage.java         |  42 +++
 .../metastore/messaging/DropTableMessage.java   |  40 +++
 .../hive/metastore/messaging/EventMessage.java  | 106 ++++++++
 .../hive/metastore/messaging/EventUtils.java    | 222 ++++++++++++++++
 .../hive/metastore/messaging/InsertMessage.java |  59 +++++
 .../messaging/MessageDeserializer.java          | 145 +++++++++++
 .../metastore/messaging/MessageFactory.java     | 241 +++++++++++++++++
 .../messaging/json/JSONAddPartitionMessage.java | 139 ++++++++++
 .../messaging/json/JSONAlterIndexMessage.java   |  88 +++++++
 .../json/JSONAlterPartitionMessage.java         | 124 +++++++++
 .../messaging/json/JSONAlterTableMessage.java   | 100 +++++++
 .../json/JSONCreateDatabaseMessage.java         |  71 +++++
 .../json/JSONCreateFunctionMessage.java         |  81 ++++++
 .../messaging/json/JSONCreateIndexMessage.java  |  82 ++++++
 .../messaging/json/JSONCreateTableMessage.java  | 100 +++++++
 .../messaging/json/JSONDropDatabaseMessage.java |  71 +++++
 .../messaging/json/JSONDropFunctionMessage.java |  81 ++++++
 .../messaging/json/JSONDropIndexMessage.java    |  82 ++++++
 .../json/JSONDropPartitionMessage.java          |  97 +++++++
 .../messaging/json/JSONDropTableMessage.java    |  86 +++++++
 .../messaging/json/JSONInsertMessage.java       |  98 +++++++
 .../messaging/json/JSONMessageDeserializer.java | 189 ++++++++++++++
 .../messaging/json/JSONMessageFactory.java      | 258 +++++++++++++++++++
 38 files changed, 3147 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9e4c590..9a5d604 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -250,6 +250,7 @@ public class HiveConf extends Configuration {
       HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
       HiveConf.ConfVars.METASTORE_EVENT_CLEAN_FREQ,
       HiveConf.ConfVars.METASTORE_EVENT_EXPIRY_DURATION,
+      HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY,
       HiveConf.ConfVars.METASTORE_FILTER_HOOK,
       HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
       HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS,
@@ -794,6 +795,9 @@ public class HiveConf extends Configuration {
     METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", "0s",
         new TimeValidator(TimeUnit.SECONDS),
         "Duration after which events expire from events table"),
+    METASTORE_EVENT_MESSAGE_FACTORY("hive.metastore.event.message.factory",
+        "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory",
+        "Factory class for making encoding and decoding messages in the events generated."),
     METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", true,
         "In unsecure mode, setting this property to true will cause the metastore to execute DFS operations using \n" +
         "the client's reported user and group permissions. Note that this property must be set on \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index ea7520d..494d01f 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -17,41 +17,42 @@
  */
 package org.apache.hive.hcatalog.listener;
 
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
-import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
-import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
-import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
-import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.RawStoreProxy;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
 import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
 import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
 import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
-import org.apache.hive.hcatalog.common.HCatConstants;
-import org.apache.hive.hcatalog.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that
  * stores events in the database.
@@ -121,10 +122,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param tableEvent table event.
    * @throws MetaException
    */
-  public void onCreateTable (CreateTableEvent tableEvent) throws MetaException {
+  public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
     Table t = tableEvent.getTable();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgFactory
+            .buildCreateTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
     enqueue(event);
@@ -134,10 +136,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param tableEvent table event.
    * @throws MetaException
    */
-  public void onDropTable (DropTableEvent tableEvent)  throws MetaException {
+  public void onDropTable(DropTableEvent tableEvent) throws MetaException {
     Table t = tableEvent.getTable();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory
+            .buildDropTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
     enqueue(event);
@@ -147,12 +150,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param tableEvent alter table event
    * @throws MetaException
    */
-  public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
+  public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
     Table before = tableEvent.getOldTable();
     Table after = tableEvent.getNewTable();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_ALTER_TABLE_EVENT,
-        msgFactory.buildAlterTableMessage(before, after).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
+            .buildAlterTableMessage(before, after).toString());
     event.setDbName(after.getDbName());
     event.setTableName(after.getTableName());
     enqueue(event);
@@ -162,12 +165,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param partitionEvent partition event
    * @throws MetaException
    */
-  public void onAddPartition (AddPartitionEvent partitionEvent)
-      throws MetaException {
+  public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
     Table t = partitionEvent.getTable();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_ADD_PARTITION_EVENT,
-        msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
+    String msg = msgFactory
+        .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString();
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
     enqueue(event);
@@ -177,11 +180,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param partitionEvent partition event
    * @throws MetaException
    */
-  public void onDropPartition (DropPartitionEvent partitionEvent)  throws MetaException {
+  public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
     Table t = partitionEvent.getTable();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_DROP_PARTITION_EVENT,
-        msgFactory.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory
+            .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
     enqueue(event);
@@ -191,12 +194,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param partitionEvent partition event
    * @throws MetaException
    */
-  public void onAlterPartition (AlterPartitionEvent partitionEvent)  throws MetaException {
+  public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
     Partition before = partitionEvent.getOldPartition();
     Partition after = partitionEvent.getNewPartition();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_ALTER_PARTITION_EVENT,
-        msgFactory.buildAlterPartitionMessage(partitionEvent.getTable(),before, after).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
+            .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
     event.setDbName(before.getDbName());
     event.setTableName(before.getTableName());
     enqueue(event);
@@ -206,11 +209,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param dbEvent database event
    * @throws MetaException
    */
-  public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException {
+  public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
     Database db = dbEvent.getDatabase();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_CREATE_DATABASE_EVENT,
-        msgFactory.buildCreateDatabaseMessage(db).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
+            .buildCreateDatabaseMessage(db).toString());
     event.setDbName(db.getName());
     enqueue(event);
   }
@@ -219,11 +222,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param dbEvent database event
    * @throws MetaException
    */
-  public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+  public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
     Database db = dbEvent.getDatabase();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_DROP_DATABASE_EVENT,
-        msgFactory.buildDropDatabaseMessage(db).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
+            .buildDropDatabaseMessage(db).toString());
     event.setDbName(db.getName());
     enqueue(event);
   }
@@ -232,11 +235,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param fnEvent function event
    * @throws MetaException
    */
-  public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException {
+  public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException {
     Function fn = fnEvent.getFunction();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_CREATE_FUNCTION_EVENT,
-        msgFactory.buildCreateFunctionMessage(fn).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
+            .buildCreateFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
     enqueue(event);
   }
@@ -245,11 +248,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param fnEvent function event
    * @throws MetaException
    */
-  public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException {
+  public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException {
     Function fn = fnEvent.getFunction();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_DROP_FUNCTION_EVENT,
-        msgFactory.buildDropFunctionMessage(fn).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
+            .buildDropFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
     enqueue(event);
   }
@@ -258,11 +261,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param indexEvent index event
    * @throws MetaException
    */
-  public void onAddIndex (AddIndexEvent indexEvent) throws MetaException {
+  public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
     Index index = indexEvent.getIndex();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_CREATE_INDEX_EVENT,
-        msgFactory.buildCreateIndexMessage(index).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
+            .buildCreateIndexMessage(index).toString());
     event.setDbName(index.getDbName());
     enqueue(event);
   }
@@ -271,11 +274,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param indexEvent index event
    * @throws MetaException
    */
-  public void onDropIndex (DropIndexEvent indexEvent) throws MetaException {
+  public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
     Index index = indexEvent.getIndex();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_DROP_INDEX_EVENT,
-        msgFactory.buildDropIndexMessage(index).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
+            .buildDropIndexMessage(index).toString());
     event.setDbName(index.getDbName());
     enqueue(event);
   }
@@ -284,21 +287,22 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param indexEvent index event
    * @throws MetaException
    */
-  public void onAlterIndex (AlterIndexEvent indexEvent)  throws MetaException {
+  public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
     Index before = indexEvent.getOldIndex();
     Index after = indexEvent.getNewIndex();
-    NotificationEvent event = new NotificationEvent(0, now(),
-        HCatConstants.HCAT_ALTER_INDEX_EVENT,
-        msgFactory.buildAlterIndexMessage(before, after).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
+            .buildAlterIndexMessage(before, after).toString());
     event.setDbName(before.getDbName());
     enqueue(event);
   }
 
   @Override
   public void onInsert(InsertEvent insertEvent) throws MetaException {
-    NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT,
-        msgFactory.buildInsertMessage(insertEvent.getDb(), insertEvent.getTable(),
-            insertEvent.getPartitionKeyValues(), insertEvent.getFiles()).toString());
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
+            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
+            insertEvent.getFiles()).toString());
     event.setDbName(insertEvent.getDb());
     event.setTableName(insertEvent.getTable());
     enqueue(event);

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
index a661962..775c1d6 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
@@ -18,28 +18,31 @@
  */
 package org.apache.hive.hcatalog.api;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-import junit.framework.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hive.hcatalog.common.HCatConstants;
 import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
-import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -125,8 +128,14 @@ public class TestHCatClientNotification {
     assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
     assertEquals(dbName, event.getDbName());
     assertEquals("hcatcreatetable", event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"hcatcreatetable\",\"timestamp\":[0-9]+}"));
+
+    // Parse the message field
+    ObjectNode jsonTree = getJsonTree(event);
+    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText());
+    assertEquals("default", jsonTree.get("db").asText());
+    assertEquals("hcatcreatetable", jsonTree.get("table").asText());
+    Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
+    assertEquals(table.toHiveTable(), tableObj);
   }
 
   // TODO - Currently no way to test alter table, as this interface doesn't support alter table
@@ -166,11 +175,8 @@ public class TestHCatClientNotification {
     String partName = "testpart";
     Map<String, String> partSpec = new HashMap<String, String>(1);
     partSpec.put(partColName, partName);
-    hCatClient.addPartition(
-        HCatAddPartitionDesc.create(
-            new HCatPartition(table, partSpec, null)
-        ).build()
-    );
+    HCatPartition part = new HCatPartition(table, partSpec, null);
+    hCatClient.addPartition(HCatAddPartitionDesc.create(part).build());
 
     List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, events.size());
@@ -181,9 +187,20 @@ public class TestHCatClientNotification {
     assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
     assertEquals("default", event.getDbName());
     assertEquals(tableName, event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
-        "\"hcataddparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}"));
+
+    // Parse the message field
+    ObjectNode jsonTree = getJsonTree(event);
+    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText());
+    assertEquals("default", jsonTree.get("db").asText());
+    assertEquals("hcataddparttable", jsonTree.get("table").asText());
+    List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
+    HCatPartition hcatPart = new HCatPartition(table, partitionObjList.get(0));
+    assertEquals(part.getDatabaseName(), hcatPart.getDatabaseName());
+    assertEquals(part.getTableName(), hcatPart.getTableName());
+    assertEquals(part.getValues(), hcatPart.getValues());
+    assertEquals(part.getColumns(), hcatPart.getColumns());
+    assertEquals(part.getPartColumns(), hcatPart.getPartColumns());
+    assertEquals(part.getLocation(), hcatPart.getLocation());
   }
 
   // TODO - currently no way to test alter partition, as HCatClient doesn't support it.
@@ -265,4 +282,10 @@ public class TestHCatClientNotification {
     assertEquals(1, events.size());
     assertEquals(firstEventId + 1, events.get(0).getEventId());
   }
+
+  private ObjectNode getJsonTree(HCatNotificationEvent event) throws Exception {
+    JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage());
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.readValue(jsonParser, ObjectNode.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 4f97cf4..690616d 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -18,25 +18,19 @@
 */
 package org.apache.hive.hcatalog.listener;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.FunctionType;
-import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.ResourceType;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
-import org.apache.htrace.fasterxml.jackson.core.JsonFactory;
-import org.apache.htrace.fasterxml.jackson.core.JsonParser;
-import org.apache.htrace.fasterxml.jackson.databind.JsonNode;
-import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.htrace.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -45,29 +39,37 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
 import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class TestDbNotificationListener {
   private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName());
   private static final int EVENTS_TTL = 30;
@@ -183,26 +185,34 @@ public class TestDbNotificationListener {
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
     cols.add(new FieldSchema("col1", "int", "nocomment"));
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    Table table =
+        new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
     msClient.createTable(table);
-
+    // Get the event
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
-
     NotificationEvent event = rsp.getEvents().get(0);
     assertEquals(firstEventId + 1, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
     assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
     assertEquals("default", event.getDbName());
     assertEquals("mytable", event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}"));
 
-    table = new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText());
+    assertEquals("default", jsonTree.get("db").asText());
+    assertEquals("mytable", jsonTree.get("table").asText());
+    Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
+    assertEquals(table, tableObj);
+
+    table =
+        new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
       msClient.createTable(table);
@@ -223,11 +233,12 @@ public class TestDbNotificationListener {
         serde, null, null, emptyParameters);
     Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
         new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+    // Event 1
     msClient.createTable(table);
-
     cols.add(new FieldSchema("col2", "int", ""));
     table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
         new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+    // Event 2
     msClient.alter_table("default", "alttable", table);
 
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
@@ -239,9 +250,14 @@ public class TestDbNotificationListener {
     assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
     assertEquals("default", event.getDbName());
     assertEquals("alttable", event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," +
-        "\"timestamp\":[0-9]+}"));
+
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, jsonTree.get("eventType").asText());
+    assertEquals("default", jsonTree.get("db").asText());
+    assertEquals("alttable", jsonTree.get("table").asText());
+    Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
+    assertEquals(table, tableObj);
 
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
@@ -319,9 +335,14 @@ public class TestDbNotificationListener {
     assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
     assertEquals("default", event.getDbName());
     assertEquals("addparttable", event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
-        "\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
+
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText());
+    assertEquals("default", jsonTree.get("db").asText());
+    assertEquals("addparttable", jsonTree.get("table").asText());
+    List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
+    assertEquals(partition, partitionObjList.get(0));
 
     partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist",
         startTime, startTime, sd, emptyParameters);
@@ -365,10 +386,14 @@ public class TestDbNotificationListener {
     assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
     assertEquals("default", event.getDbName());
     assertEquals("alterparttable", event.getTableName());
-    assertTrue(event.getMessage(),
-        event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," +
-        "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}"));
+
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, jsonTree.get("eventType").asText());
+    assertEquals("default", jsonTree.get("db").asText());
+    assertEquals("alterparttable", jsonTree.get("table").asText());
+    List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
+    assertEquals(newPart, partitionObjList.get(0));
 
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
@@ -445,7 +470,7 @@ public class TestDbNotificationListener {
     assertTrue(event.getEventTime() >= startTime);
     assertEquals(HCatConstants.HCAT_CREATE_FUNCTION_EVENT, event.getEventType());
     assertEquals(dbName, event.getDbName());
-    Function funcObj = getFunctionObj(getJsonTree(event));
+    Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
     assertEquals(dbName, funcObj.getDbName());
     assertEquals(funcName, funcObj.getFunctionName());
     assertEquals(funcClass, funcObj.getClassName());
@@ -488,7 +513,7 @@ public class TestDbNotificationListener {
     assertTrue(event.getEventTime() >= startTime);
     assertEquals(HCatConstants.HCAT_DROP_FUNCTION_EVENT, event.getEventType());
     assertEquals(dbName, event.getDbName());
-    Function funcObj = getFunctionObj(getJsonTree(event));
+    Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
     assertEquals(dbName, funcObj.getDbName());
     assertEquals(funcName, funcObj.getFunctionName());
     assertEquals(funcClass, funcObj.getClassName());
@@ -542,7 +567,7 @@ public class TestDbNotificationListener {
     assertTrue(event.getEventTime() >= startTime);
     assertEquals(HCatConstants.HCAT_CREATE_INDEX_EVENT, event.getEventType());
     assertEquals(dbName, event.getDbName());
-    Index indexObj = getIndexObj(getJsonTree(event));
+    Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
     assertEquals(dbName, indexObj.getDbName());
     assertEquals(indexName, indexObj.getIndexName());
     assertEquals(tableName, indexObj.getOrigTableName());
@@ -593,7 +618,7 @@ public class TestDbNotificationListener {
     assertTrue(event.getEventTime() >= startTime);
     assertEquals(HCatConstants.HCAT_DROP_INDEX_EVENT, event.getEventType());
     assertEquals(dbName, event.getDbName());
-    Index indexObj = getIndexObj(getJsonTree(event));
+    Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
     assertEquals(dbName, indexObj.getDbName());
     assertEquals(indexName.toLowerCase(), indexObj.getIndexName());
     assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName());
@@ -647,7 +672,7 @@ public class TestDbNotificationListener {
     assertTrue(event.getEventTime() >= startTime);
     assertEquals(HCatConstants.HCAT_ALTER_INDEX_EVENT, event.getEventType());
     assertEquals(dbName, event.getDbName());
-    Index indexObj = getIndexObj(getJsonTree(event), "afterIndexObjJson");
+    Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson");
     assertEquals(dbName, indexObj.getDbName());
     assertEquals(indexName, indexObj.getIndexName());
     assertEquals(tableName, indexObj.getOrigTableName());
@@ -971,30 +996,4 @@ public class TestDbNotificationListener {
     LOG.info("second trigger done");
     assertEquals(0, rsp2.getEventsSize());
   }
-
-  private ObjectNode getJsonTree(NotificationEvent event) throws Exception {
-    JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage());
-    ObjectMapper mapper = new ObjectMapper();
-    return mapper.readValue(jsonParser, ObjectNode.class);
-  }
-
-  private Function getFunctionObj(JsonNode jsonTree) throws Exception {
-    TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
-    Function funcObj = new Function();
-    String tableJson = jsonTree.get("functionObjJson").asText();
-    deSerializer.deserialize(funcObj, tableJson, "UTF-8");
-    return funcObj;
-  }
-
-  private Index getIndexObj(JsonNode jsonTree) throws Exception {
-    return getIndexObj(jsonTree, "indexObjJson");
-  }
-
-  private Index getIndexObj(JsonNode jsonTree, String indexObjKey) throws Exception {
-    TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
-    Index indexObj = new Index();
-    String tableJson = jsonTree.get(indexObjKey).asText();
-    deSerializer.deserialize(indexObj, tableJson, "UTF-8");
-    return indexObj;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java
new file mode 100644
index 0000000..26898f2
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AddPartitionMessage extends EventMessage {
+
+  protected AddPartitionMessage() {
+    super(EventType.ADD_PARTITION);
+  }
+
+  /**
+   * Getter for name of table (where partitions are added).
+   * @return Table-name (String).
+   */
+  public abstract String getTable();
+
+  /**
+   * Getter for list of partitions added.
+   * @return List of maps, where each map identifies values for each partition-key, for every added partition.
+   */
+  public abstract List<Map<String, String>> getPartitions ();
+
+  @Override
+  public EventMessage checkValid() {
+    if (getTable() == null)
+      throw new IllegalStateException("Table name unset.");
+    if (getPartitions() == null)
+      throw new IllegalStateException("Partition-list unset.");
+    return super.checkValid();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java
new file mode 100644
index 0000000..0fc7f9e
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class AlterIndexMessage extends EventMessage {
+
+  protected AlterIndexMessage() {
+    super(EventType.ALTER_INDEX);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
new file mode 100644
index 0000000..d89dba1
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.Map;
+
+public abstract class AlterPartitionMessage extends EventMessage {
+
+  protected AlterPartitionMessage() {
+    super(EventType.ALTER_PARTITION);
+  }
+
+  public abstract String getTable();
+
+  public abstract Map<String,String> getKeyValues();
+
+  @Override
+  public EventMessage checkValid() {
+    if (getTable() == null) throw new IllegalStateException("Table name unset.");
+    if (getKeyValues() == null) throw new IllegalStateException("Partition values unset");
+    return super.checkValid();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
new file mode 100644
index 0000000..99e678a
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class AlterTableMessage extends EventMessage {
+
+  protected AlterTableMessage() {
+    super(EventType.ALTER_TABLE);
+  }
+
+  public abstract String getTable();
+
+  @Override
+  public EventMessage checkValid() {
+    if (getTable() == null) throw new IllegalStateException("Table name unset.");
+    return super.checkValid();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java
new file mode 100644
index 0000000..7614298
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateDatabaseMessage extends EventMessage {
+
+  protected CreateDatabaseMessage() {
+    super(EventType.CREATE_DATABASE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java
new file mode 100644
index 0000000..867e8ec
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateFunctionMessage extends EventMessage {
+
+  protected CreateFunctionMessage() {
+    super(EventType.CREATE_FUNCTION);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java
new file mode 100644
index 0000000..81676aa
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateIndexMessage extends EventMessage {
+
+  protected CreateIndexMessage() {
+    super(EventType.CREATE_INDEX);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java
new file mode 100644
index 0000000..c88c59c
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateTableMessage extends EventMessage {
+
+  protected CreateTableMessage() {
+    super(EventType.CREATE_TABLE);
+  }
+
+  /**
+   * Getter for the name of table created in HCatalog.
+   * @return Table-name (String).
+   */
+  public abstract String getTable();
+
+  @Override
+  public EventMessage checkValid() {
+    if (getTable() == null)
+      throw new IllegalStateException("Table name unset.");
+    return super.checkValid();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java
new file mode 100644
index 0000000..fa6da38
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropDatabaseMessage extends EventMessage {
+
+  protected DropDatabaseMessage() {
+    super(EventType.DROP_DATABASE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java
new file mode 100644
index 0000000..82cdc44
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropFunctionMessage extends EventMessage {
+
+  protected DropFunctionMessage() {
+    super(EventType.DROP_FUNCTION);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java
new file mode 100644
index 0000000..ce7b760
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropIndexMessage extends EventMessage {
+
+  protected DropIndexMessage() {
+    super(EventType.DROP_INDEX);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java
new file mode 100644
index 0000000..26aecb3
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class DropPartitionMessage extends EventMessage {
+
+  protected DropPartitionMessage() {
+    super(EventType.DROP_PARTITION);
+  }
+
+  public abstract String getTable();
+  public abstract List<Map<String, String>> getPartitions ();
+
+  @Override
+  public EventMessage checkValid() {
+    if (getTable() == null)
+      throw new IllegalStateException("Table name unset.");
+    if (getPartitions() == null)
+      throw new IllegalStateException("Partition-list unset.");
+    return super.checkValid();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java
new file mode 100644
index 0000000..64a8cc5
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropTableMessage extends EventMessage {
+
+  protected DropTableMessage() {
+    super(EventType.DROP_TABLE);
+  }
+
+  /**
+   * Getter for the name of the table being dropped.
+   * @return Table-name (String).
+   */
+  public abstract String getTable();
+
+  @Override
+  public EventMessage checkValid() {
+    if (getTable() == null)
+      throw new IllegalStateException("Table name unset.");
+    return super.checkValid();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
new file mode 100644
index 0000000..1ec0de0
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+/**
+ * Class representing messages emitted when Metastore operations are done.
+ * (E.g. Creation and deletion of databases, tables and partitions.)
+ */
+public abstract class EventMessage {
+
+  /**
+   * Enumeration of all supported types of Metastore operations.
+   */
+  public static enum EventType {
+
+    CREATE_DATABASE(MessageFactory.CREATE_DATABASE_EVENT),
+    DROP_DATABASE(MessageFactory.DROP_DATABASE_EVENT),
+    CREATE_TABLE(MessageFactory.CREATE_TABLE_EVENT),
+    DROP_TABLE(MessageFactory.DROP_TABLE_EVENT),
+    ADD_PARTITION(MessageFactory.ADD_PARTITION_EVENT),
+    DROP_PARTITION(MessageFactory.DROP_PARTITION_EVENT),
+    ALTER_TABLE(MessageFactory.ALTER_TABLE_EVENT),
+    ALTER_PARTITION(MessageFactory.ALTER_PARTITION_EVENT),
+    INSERT(MessageFactory.INSERT_EVENT),
+    CREATE_FUNCTION(MessageFactory.CREATE_FUNCTION_EVENT),
+    DROP_FUNCTION(MessageFactory.DROP_FUNCTION_EVENT),
+    CREATE_INDEX(MessageFactory.CREATE_INDEX_EVENT),
+    DROP_INDEX(MessageFactory.DROP_INDEX_EVENT),
+    ALTER_INDEX(MessageFactory.ALTER_INDEX_EVENT);
+
+    private String typeString;
+
+    EventType(String typeString) {
+      this.typeString = typeString;
+    }
+
+    @Override
+    public String toString() { return typeString; }
+  }
+
+  protected EventType eventType;
+
+  protected EventMessage(EventType eventType) {
+    this.eventType = eventType;
+  }
+
+  public EventType getEventType() {
+    return eventType;
+  }
+
+  /**
+   * Getter for HCatalog Server's URL.
+   * (This is where the event originates from.)
+   * @return HCatalog Server's URL (String).
+   */
+  public abstract String getServer();
+
+  /**
+   * Getter for the Kerberos principal of the HCatalog service.
+   * @return HCatalog Service Principal (String).
+   */
+  public abstract String getServicePrincipal();
+
+  /**
+   * Getter for the name of the Database on which the Metastore operation is done.
+   * @return Database-name (String).
+   */
+  public abstract String getDB();
+
+  /**
+   * Getter for the timestamp associated with the operation.
+   * @return Timestamp (Long - seconds since epoch).
+   */
+  public abstract Long   getTimestamp();
+
+  /**
+   * Class invariant. Checked after construction or deserialization.
+   */
+  public EventMessage checkValid() {
+    if (getServer() == null || getServicePrincipal() == null)
+      throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null.");
+    if (getEventType() == null)
+      throw new IllegalStateException("Event-type unset.");
+    if (getDB() == null)
+      throw new IllegalArgumentException("DB-name unset.");
+
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
new file mode 100644
index 0000000..932af7e
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class EventUtils {
+
+  /**
+   * Utility function that constructs a notification filter to match a given db name and/or table name.
+   * If dbName == null, fetches all warehouse events.
+   * If dnName != null, but tableName == null, fetches all events for the db
+   * If dbName != null && tableName != null, fetches all events for the specified table
+   * @param dbName
+   * @param tableName
+   * @return
+   */
+  public static IMetaStoreClient.NotificationFilter getDbTblNotificationFilter(final String dbName, final String tableName){
+    return new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        if (event == null){
+          return false; // get rid of trivial case first, so that we can safely assume non-null
+        }
+        if (dbName == null){
+          return true; // if our dbName is null, we're interested in all wh events
+        }
+        if (dbName.equalsIgnoreCase(event.getDbName())){
+          if ( (tableName == null)
+              // if our dbName is equal, but tableName is blank, we're interested in this db-level event
+              || (tableName.equalsIgnoreCase(event.getTableName()))
+            // table level event that matches us
+              ){
+            return true;
+          }
+        }
+        return false;
+      }
+    };
+  }
+
+
+  public interface NotificationFetcher {
+    public int getBatchSize() throws IOException;
+    public long getCurrentNotificationEventId() throws IOException;
+    public List<NotificationEvent> getNextNotificationEvents(
+        long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
+  }
+
+  // MetaStoreClient-based impl of NotificationFetcher
+  public static class MSClientNotificationFetcher implements  NotificationFetcher{
+
+    private IMetaStoreClient msc = null;
+    private Integer batchSize = null;
+
+    public MSClientNotificationFetcher(IMetaStoreClient msc){
+      this.msc = msc;
+    }
+
+    @Override
+    public int getBatchSize() throws IOException {
+      if (batchSize == null){
+        try {
+          batchSize = Integer.parseInt(
+            msc.getConfigValue(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname, "50"));
+          // TODO: we're asking the metastore what its configuration for this var is - we may
+          // want to revisit to pull from client side instead. The reason I have it this way
+          // is because the metastore is more likely to have a reasonable config for this than
+          // an arbitrary client.
+        } catch (TException e) {
+          throw new IOException(e);
+        }
+      }
+      return batchSize;
+    }
+
+    @Override
+    public long getCurrentNotificationEventId() throws IOException {
+      try {
+        return msc.getCurrentNotificationEventId().getEventId();
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public List<NotificationEvent> getNextNotificationEvents(
+        long pos, IMetaStoreClient.NotificationFilter filter) throws IOException {
+      try {
+        return msc.getNextNotification(pos,getBatchSize(), filter).getEvents();
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  public static class NotificationEventIterator implements Iterator<NotificationEvent> {
+
+    private NotificationFetcher nfetcher;
+    private IMetaStoreClient.NotificationFilter filter;
+    private int maxEvents;
+
+    private Iterator<NotificationEvent> batchIter = null;
+    private List<NotificationEvent> batch = null;
+    private long pos;
+    private long maxPos;
+    private int eventCount;
+
+    public NotificationEventIterator(
+        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+        String dbName, String tableName) throws IOException {
+      init(nfetcher, eventFrom, maxEvents, EventUtils.getDbTblNotificationFilter(dbName, tableName));
+      // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
+      // is an operation that needs to run before delegating to the other ctor, and this messes up chaining
+      // ctors
+    }
+
+    public NotificationEventIterator(
+        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+        IMetaStoreClient.NotificationFilter filter) throws IOException {
+      init(nfetcher,eventFrom,maxEvents,filter);
+    }
+
+    private void init(
+        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+        IMetaStoreClient.NotificationFilter filter) throws IOException {
+      this.nfetcher = nfetcher;
+      this.filter = filter;
+      this.pos = eventFrom;
+      if (maxEvents < 1){
+        // 0 or -1 implies fetch everything
+        this.maxEvents = Integer.MAX_VALUE;
+      } else {
+        this.maxEvents = maxEvents;
+      }
+
+      this.eventCount = 0;
+      this.maxPos = nfetcher.getCurrentNotificationEventId();
+    }
+
+    private void fetchNextBatch() throws IOException {
+      batch = nfetcher.getNextNotificationEvents(pos, filter);
+      int batchSize = nfetcher.getBatchSize();
+      while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){
+        // no valid events this batch, but we're still not done processing events
+        pos += batchSize;
+        batch = nfetcher.getNextNotificationEvents(pos,filter);
+      }
+
+      if (batch == null){
+        batch = new ArrayList<NotificationEvent>();
+        // instantiate empty list so that we don't error out on iterator fetching.
+        // If we're here, then the next check of pos will show our caller that
+        // that we've exhausted our event supply
+      }
+      batchIter = batch.iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (eventCount >= maxEvents){
+        // If we've already satisfied the number of events we were supposed to deliver, we end it.
+        return false;
+      }
+      if ((batchIter != null) && (batchIter.hasNext())){
+        // If we have a valid batchIter and it has more elements, return them.
+        return true;
+      }
+      // If we're here, we want more events, and either batchIter is null, or batchIter
+      // has reached the end of the current batch. Let's fetch the next batch.
+      try {
+        fetchNextBatch();
+      } catch (IOException e) {
+        // Regrettable that we have to wrap the IOException into a RuntimeException,
+        // but throwing the exception is the appropriate result here, and hasNext()
+        // signature will only allow RuntimeExceptions. Iterator.hasNext() really
+        // should have allowed IOExceptions
+        throw new RuntimeException(e);
+      }
+      // New batch has been fetched. If it's not empty, we have more elements to process.
+      return !batch.isEmpty();
+    }
+
+    @Override
+    public NotificationEvent next() {
+      eventCount++;
+      NotificationEvent ev = batchIter.next();
+      pos = ev.getEventId();
+      return ev;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator");
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
new file mode 100644
index 0000000..fe747df
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HCat message sent when an insert is done to a table or partition.
+ */
+public abstract class InsertMessage extends EventMessage {
+
+  protected InsertMessage() {
+    super(EventType.INSERT);
+  }
+
+  /**
+   * Getter for the name of the table being insert into.
+   * @return Table-name (String).
+   */
+  public abstract String getTable();
+
+  /**
+   * Get the map of partition keyvalues.  Will be null if this insert is to a table and not a
+   * partition.
+   * @return Map of partition keyvalues, or null.
+   */
+  public abstract Map<String,String> getPartitionKeyValues();
+
+  /**
+   * Get the list of files created as a result of this DML operation.  May be null.
+   * @return List of new files, or null.
+   */
+  public abstract List<String> getFiles();
+
+  @Override
+  public EventMessage checkValid() {
+    if (getTable() == null)
+      throw new IllegalStateException("Table name unset.");
+    return super.checkValid();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
new file mode 100644
index 0000000..515c455
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+/**
+ * Interface for converting HCat events from String-form back to EventMessage instances.
+ */
+public abstract class MessageDeserializer {
+
+  /**
+   * Method to construct EventMessage from string.
+   */
+  public EventMessage getEventMessage(String eventTypeString, String messageBody) {
+
+    switch (EventMessage.EventType.valueOf(eventTypeString)) {
+    case CREATE_DATABASE:
+      return getCreateDatabaseMessage(messageBody);
+    case DROP_DATABASE:
+      return getDropDatabaseMessage(messageBody);
+    case CREATE_TABLE:
+      return getCreateTableMessage(messageBody);
+    case ALTER_TABLE:
+      return getAlterTableMessage(messageBody);
+    case DROP_TABLE:
+      return getDropTableMessage(messageBody);
+    case ADD_PARTITION:
+      return getAddPartitionMessage(messageBody);
+    case ALTER_PARTITION:
+      return getAlterPartitionMessage(messageBody);
+    case DROP_PARTITION:
+      return getDropPartitionMessage(messageBody);
+    case CREATE_FUNCTION:
+      return getCreateFunctionMessage(messageBody);
+    case DROP_FUNCTION:
+      return getDropFunctionMessage(messageBody);
+    case CREATE_INDEX:
+      return getCreateIndexMessage(messageBody);
+    case DROP_INDEX:
+      return getDropIndexMessage(messageBody);
+    case ALTER_INDEX:
+      return getAlterIndexMessage(messageBody);
+    case INSERT:
+      return getInsertMessage(messageBody);
+
+    default:
+      throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
+    }
+  }
+
+  /**
+   * Method to de-serialize CreateDatabaseMessage instance.
+   */
+  public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody);
+
+  /**
+   * Method to de-serialize DropDatabaseMessage instance.
+   */
+  public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody);
+
+  /**
+   * Method to de-serialize CreateTableMessage instance.
+   */
+  public abstract CreateTableMessage getCreateTableMessage(String messageBody);
+
+  /**
+   * Method to de-serialize AlterTableMessge
+   * @param messageBody string message
+   * @return object message
+   */
+  public abstract AlterTableMessage getAlterTableMessage(String messageBody);
+
+  /**
+   * Method to de-serialize DropTableMessage instance.
+   */
+  public abstract DropTableMessage getDropTableMessage(String messageBody);
+
+  /**
+   * Method to de-serialize AddPartitionMessage instance.
+   */
+  public abstract AddPartitionMessage getAddPartitionMessage(String messageBody);
+
+  /**
+   * Method to deserialize AlterPartitionMessage
+   * @param messageBody the message in serialized form
+   * @return message in object form
+   */
+  public abstract AlterPartitionMessage getAlterPartitionMessage(String messageBody);
+
+  /**
+   * Method to de-serialize DropPartitionMessage instance.
+   */
+  public abstract DropPartitionMessage getDropPartitionMessage(String messageBody);
+
+  /**
+   * Method to de-serialize CreateFunctionMessage instance.
+   */
+  public abstract CreateFunctionMessage getCreateFunctionMessage(String messageBody);
+
+  /**
+   * Method to de-serialize DropFunctionMessage instance.
+   */
+  public abstract DropFunctionMessage getDropFunctionMessage(String messageBody);
+
+  /**
+   * Method to de-serialize CreateIndexMessage instance.
+   */
+  public abstract CreateIndexMessage getCreateIndexMessage(String messageBody);
+
+  /**
+   * Method to de-serialize DropIndexMessage instance.
+   */
+  public abstract DropIndexMessage getDropIndexMessage(String messageBody);
+
+  /**
+   * Method to de-serialize AlterIndexMessage instance.
+   */
+  public abstract AlterIndexMessage getAlterIndexMessage(String messageBody);
+
+  /**
+   * Method to deserialize InsertMessage
+   * @param messageBody the message in serialized form
+   * @return message in object form
+   */
+  public abstract InsertMessage getInsertMessage(String messageBody);
+
+  // Protection against construction.
+  protected MessageDeserializer() {}
+}