You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/12/19 22:10:09 UTC

[8/8] hive git commit: HIVE-15294: Capture additional metadata to replicate a simple insert at destination (Vaibhav Gumashta reviewed by Thejas Nair)

HIVE-15294: Capture additional metadata to replicate a simple insert at destination (Vaibhav Gumashta reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bbd99ed6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bbd99ed6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bbd99ed6

Branch: refs/heads/master
Commit: bbd99ed60e5708af3dc329b097d4b024f73041bd
Parents: 12f5550
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon Dec 19 14:02:01 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon Dec 19 14:02:01 2016 -0800

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |   55 +-
 .../listener/TestDbNotificationListener.java    |  955 +++++---
 metastore/if/hive_metastore.thrift              |    4 +-
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 2054 ++++++++--------
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp |  821 ++++---
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   13 +
 .../metastore/api/ClearFileMetadataRequest.java |   32 +-
 .../hive/metastore/api/ClientCapabilities.java  |   32 +-
 .../hive/metastore/api/FireEventRequest.java    |   32 +-
 .../metastore/api/GetAllFunctionsResponse.java  |   36 +-
 .../api/GetFileMetadataByExprRequest.java       |   32 +-
 .../api/GetFileMetadataByExprResult.java        |   48 +-
 .../metastore/api/GetFileMetadataRequest.java   |   32 +-
 .../metastore/api/GetFileMetadataResult.java    |   44 +-
 .../hive/metastore/api/GetTablesRequest.java    |   32 +-
 .../hive/metastore/api/GetTablesResult.java     |   36 +-
 .../metastore/api/InsertEventRequestData.java   |  181 +-
 .../metastore/api/PutFileMetadataRequest.java   |   64 +-
 .../hive/metastore/api/ThriftHiveMetastore.java | 2220 +++++++++---------
 .../gen-php/metastore/ThriftHiveMetastore.php   | 1250 +++++-----
 .../src/gen/thrift/gen-php/metastore/Types.php  |  291 ++-
 .../hive_metastore/ThriftHiveMetastore.py       |  842 +++----
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  185 +-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |    4 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |   28 +-
 .../hive/metastore/events/InsertEvent.java      |   36 +-
 .../metastore/messaging/MessageFactory.java     |   22 +-
 .../messaging/json/JSONInsertMessage.java       |   54 +-
 .../messaging/json/JSONMessageFactory.java      |   36 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   56 +-
 .../apache/hadoop/fs/ProxyLocalFileSystem.java  |  104 +-
 31 files changed, 5202 insertions(+), 4429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 119801f..8d29bfc 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,6 +104,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param tableEvent table event.
    * @throws org.apache.hadoop.hive.metastore.api.MetaException
    */
+  @Override
   public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException {
     String key = tableEvent.getKey();
     if (key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) {
@@ -122,6 +122,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
    * @param tableEvent table event.
    * @throws MetaException
    */
+  @Override
   public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
     Table t = tableEvent.getTable();
     NotificationEvent event =
@@ -129,13 +130,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildCreateTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param tableEvent table event.
    * @throws MetaException
    */
+  @Override
   public void onDropTable(DropTableEvent tableEvent) throws MetaException {
     Table t = tableEvent.getTable();
     NotificationEvent event =
@@ -143,13 +145,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param tableEvent alter table event
    * @throws MetaException
    */
+  @Override
   public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
     Table before = tableEvent.getOldTable();
     Table after = tableEvent.getNewTable();
@@ -158,13 +161,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildAlterTableMessage(before, after).toString());
     event.setDbName(after.getDbName());
     event.setTableName(after.getTableName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param partitionEvent partition event
    * @throws MetaException
    */
+  @Override
   public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
     Table t = partitionEvent.getTable();
     String msg = msgFactory
@@ -173,13 +177,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param partitionEvent partition event
    * @throws MetaException
    */
+  @Override
   public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
     Table t = partitionEvent.getTable();
     NotificationEvent event =
@@ -187,13 +192,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param partitionEvent partition event
    * @throws MetaException
    */
+  @Override
   public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
     Partition before = partitionEvent.getOldPartition();
     Partition after = partitionEvent.getNewPartition();
@@ -202,91 +208,98 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
     event.setDbName(before.getDbName());
     event.setTableName(before.getTableName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param dbEvent database event
    * @throws MetaException
    */
+  @Override
   public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
     Database db = dbEvent.getDatabase();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
             .buildCreateDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param dbEvent database event
    * @throws MetaException
    */
+  @Override
   public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
     Database db = dbEvent.getDatabase();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
             .buildDropDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param fnEvent function event
    * @throws MetaException
    */
+  @Override
   public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException {
     Function fn = fnEvent.getFunction();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
             .buildCreateFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param fnEvent function event
    * @throws MetaException
    */
+  @Override
   public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException {
     Function fn = fnEvent.getFunction();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
             .buildDropFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param indexEvent index event
    * @throws MetaException
    */
+  @Override
   public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
     Index index = indexEvent.getIndex();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
             .buildCreateIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param indexEvent index event
    * @throws MetaException
    */
+  @Override
   public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
     Index index = indexEvent.getIndex();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
             .buildDropIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param indexEvent index event
    * @throws MetaException
    */
+  @Override
   public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
     Index before = indexEvent.getOldIndex();
     Index after = indexEvent.getNewIndex();
@@ -294,7 +307,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
             .buildAlterIndexMessage(before, after).toString());
     event.setDbName(before.getDbName());
-    enqueue(event);
+    process(event);
   }
 
   @Override
@@ -302,20 +315,20 @@ public class DbNotificationListener extends MetaStoreEventListener {
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
             insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
-            insertEvent.getFiles()).toString());
+            insertEvent.getFiles(), insertEvent.getFileChecksums()).toString());
     event.setDbName(insertEvent.getDb());
     event.setTableName(insertEvent.getTable());
-    enqueue(event);
+    process(event);
   }
 
   /**
    * @param partSetDoneEvent
    * @throws MetaException
    */
+  @Override
   public void onLoadPartitionDone(LoadPartitionDoneEvent partSetDoneEvent) throws MetaException {
     // TODO, we don't support this, but we should, since users may create an empty partition and
     // then load data into it.
-
   }
 
   private int now() {
@@ -329,10 +342,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
     return (int)millis;
   }
 
-  private void enqueue(NotificationEvent event) {
+  // Process this notification by adding it to metastore DB
+  private void process(NotificationEvent event) {
     if (rs != null) {
-      synchronized(NOTIFICATION_TBL_LOCK) {
-        LOG.debug("DbNotif:Enqueueing : {}:{}",event.getEventId(),event.getMessage());
+      synchronized (NOTIFICATION_TBL_LOCK) {
+        LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
+            event.getMessage());
         rs.addNotificationEvent(event);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 0b691b1..9e6af8f 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -15,7 +15,7 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
-*/
+ */
 package org.apache.hive.hcatalog.listener;
 
 import static org.junit.Assert.assertEquals;
@@ -27,7 +27,7 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -53,16 +53,11 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hive.hcatalog.common.HCatConstants;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.ObjectNode;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -70,8 +65,13 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Tests DbNotificationListener when used as a transactional event listener
+ * (hive.metastore.transactional.event.listeners)
+ */
 public class TestDbNotificationListener {
-  private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class
+      .getName());
   private static final int EVENTS_TTL = 30;
   private static final int CLEANUP_SLEEP_TIME = 10;
   private static Map<String, String> emptyParameters = new HashMap<String, String>();
@@ -86,12 +86,11 @@ public class TestDbNotificationListener {
     HiveConf conf = new HiveConf();
     conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
         DbNotificationListener.class.getName());
-    conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL)+"s");
+    conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL) + "s");
     conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
     conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
-    conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
-        DummyRawStoreFailEvent.class.getName());
+    conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName());
     Class dbNotificationListener =
         Class.forName("org.apache.hive.hcatalog.listener.DbNotificationListener");
     Class[] classes = dbNotificationListener.getDeclaredClasses();
@@ -102,8 +101,7 @@ public class TestDbNotificationListener {
         sleepTimeField.set(null, CLEANUP_SLEEP_TIME * 1000);
       }
     }
-    conf
-    .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     SessionState.start(new CliSessionState(conf));
     msClient = new HiveMetaStoreClient(conf);
@@ -114,363 +112,488 @@ public class TestDbNotificationListener {
   public void setup() throws Exception {
     long now = System.currentTimeMillis() / 1000;
     startTime = 0;
-    if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge");
-    else startTime = (int) now;
+    if (now > Integer.MAX_VALUE) {
+      fail("Bummer, time has fallen over the edge");
+    } else {
+      startTime = (int) now;
+    }
     firstEventId = msClient.getCurrentNotificationEventId().getEventId();
     DummyRawStoreFailEvent.setEventSucceed(true);
   }
 
   @Test
   public void createDatabase() throws Exception {
-    Database db = new Database("mydb", "no description", "file:/tmp", emptyParameters);
+    String dbName = "createdb";
+    String dbName2 = "createdb2";
+    String dbLocationUri = "file:/tmp";
+    String dbDescription = "no description";
+    Database db = new Database(dbName, dbDescription, dbLocationUri, emptyParameters);
     msClient.createDatabase(db);
 
+    // Read notification from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
 
+    // Read event from notification
     NotificationEvent event = rsp.getEvents().get(0);
     assertEquals(firstEventId + 1, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType());
-    assertEquals("mydb", event.getDbName());
+    assertEquals(EventType.CREATE_DATABASE.toString(), event.getEventType());
+    assertEquals(dbName, event.getDbName());
     assertNull(event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"timestamp\":[0-9]+}"));
 
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(EventType.CREATE_DATABASE.toString(), jsonTree.get("eventType").asText());
+    assertEquals(dbName, jsonTree.get("db").asText());
+
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
-    db = new Database("mydb2", "no description", "file:/tmp", emptyParameters);
+    db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters);
     try {
       msClient.createDatabase(db);
+      fail("Error: create database should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
   }
 
   @Test
   public void dropDatabase() throws Exception {
-    Database db = new Database("dropdb", "no description", "file:/tmp", emptyParameters);
+    String dbName = "dropdb";
+    String dbName2 = "dropdb2";
+    String dbLocationUri = "file:/tmp";
+    String dbDescription = "no description";
+    Database db = new Database(dbName, dbDescription, dbLocationUri, emptyParameters);
     msClient.createDatabase(db);
-    msClient.dropDatabase("dropdb");
+    msClient.dropDatabase(dbName);
 
+    // Read notification from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+
+    // Two events: one for create db and other for drop db
     assertEquals(2, rsp.getEventsSize());
 
+    // Read event from notification
     NotificationEvent event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
-    assertEquals("dropdb", event.getDbName());
+    assertEquals(EventType.DROP_DATABASE.toString(), event.getEventType());
+    assertEquals(dbName, event.getDbName());
     assertNull(event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"dropdb\",\"timestamp\":[0-9]+}"));
 
-    db = new Database("dropdb", "no description", "file:/tmp", emptyParameters);
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(EventType.DROP_DATABASE.toString(), jsonTree.get("eventType").asText());
+    assertEquals(dbName, jsonTree.get("db").asText());
+
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
+    db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters);
     msClient.createDatabase(db);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
-      msClient.dropDatabase("dropdb");
+      msClient.dropDatabase(dbName2);
+      fail("Error: drop database should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
   }
 
   @Test
   public void createTable() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "createtable";
+    String tblName2 = "createtable2";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
     StorageDescriptor sd =
-        new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null,
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
             emptyParameters);
-    Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+            emptyParameters, null, null, null);
     msClient.createTable(table);
-    // Get the event
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(0);
     assertEquals(firstEventId + 1, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("mytable", event.getTableName());
+    assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
 
     // Parse the message field
     ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
-    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText());
-    assertEquals("default", jsonTree.get("db").asText());
-    assertEquals("mytable", jsonTree.get("table").asText());
+    assertEquals(EventType.CREATE_TABLE.toString(), jsonTree.get("eventType").asText());
+    assertEquals(defaultDbName, jsonTree.get("db").asText());
+    assertEquals(tblName, jsonTree.get("table").asText());
     Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
     assertEquals(table, tableObj);
 
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
     table =
-        new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, emptyParameters,
-            null, null, null);
+        new Table(tblName2, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+            emptyParameters, null, null, null);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
       msClient.createTable(table);
+      fail("Error: create table should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
   }
 
   @Test
   public void alterTable() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "altertabletbl";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
+    FieldSchema col2 = new FieldSchema("col2", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
-        new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd,
+            new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+
     // Event 1
     msClient.createTable(table);
-    cols.add(new FieldSchema("col2", "int", ""));
-    table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
-        new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+    cols.add(col2);
+    table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd,
+            new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
     // Event 2
-    msClient.alter_table("default", "alttable", table);
+    msClient.alter_table(defaultDbName, tblName, table);
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
-
     NotificationEvent event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("alttable", event.getTableName());
+    assertEquals(EventType.ALTER_TABLE.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
 
     // Parse the message field
     ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
-    assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, jsonTree.get("eventType").asText());
-    assertEquals("default", jsonTree.get("db").asText());
-    assertEquals("alttable", jsonTree.get("table").asText());
+    assertEquals(EventType.ALTER_TABLE.toString(), jsonTree.get("eventType").asText());
+    assertEquals(defaultDbName, jsonTree.get("db").asText());
+    assertEquals(tblName, jsonTree.get("table").asText());
     Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
     assertEquals(table, tableObj);
 
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
-      msClient.alter_table("default", "alttable", table);
+      msClient.alter_table(defaultDbName, tblName, table);
+      fail("Error: alter table should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
   }
 
   @Test
   public void dropTable() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "droptbl";
+    String tblName2 = "droptbl2";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("droptable", "default", "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+            emptyParameters, null, null, null);
+
+    // Event 1
     msClient.createTable(table);
-    msClient.dropTable("default", "droptable");
+    // Event 2
+    msClient.dropTable(defaultDbName, tblName);
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
-
     NotificationEvent event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("droptable", event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
-        "\"droptable\",\"timestamp\":[0-9]+}"));
-
-    table = new Table("droptable2", "default", "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    assertEquals(EventType.DROP_TABLE.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
+
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(EventType.DROP_TABLE.toString(), jsonTree.get("eventType").asText());
+    assertEquals(defaultDbName, jsonTree.get("db").asText());
+    assertEquals(tblName, jsonTree.get("table").asText());
+
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
+    table =
+        new Table(tblName2, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+            emptyParameters, null, null, null);
     msClient.createTable(table);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
-      msClient.dropTable("default", "droptable2");
+      msClient.dropTable(defaultDbName, tblName2);
+      fail("Error: drop table should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
   }
 
   @Test
   public void addPartition() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "addptn";
+    String tblName2 = "addptn2";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-    partCols.add(new FieldSchema("ds", "string", ""));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("addPartTable", "default", "me", startTime, startTime, 0, sd, partCols,
-        emptyParameters, null, null, null);
-    msClient.createTable(table);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    List<String> partCol1Vals = Arrays.asList("today");
+    partCols.add(partCol1);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+            emptyParameters, null, null, null);
 
-    Partition partition = new Partition(Arrays.asList("today"), "default", "addPartTable",
-        startTime, startTime, sd, emptyParameters);
+    // Event 1
+    msClient.createTable(table);
+    Partition partition =
+        new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+            emptyParameters);
+    // Event 2
     msClient.add_partition(partition);
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
-
     NotificationEvent event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("addparttable", event.getTableName());
+    assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
 
     // Parse the message field
     ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
-    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText());
-    assertEquals("default", jsonTree.get("db").asText());
-    assertEquals("addparttable", jsonTree.get("table").asText());
+    assertEquals(EventType.ADD_PARTITION.toString(), jsonTree.get("eventType").asText());
+    assertEquals(defaultDbName, jsonTree.get("db").asText());
+    assertEquals(tblName, jsonTree.get("table").asText());
     List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
     assertEquals(partition, partitionObjList.get(0));
 
-    partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist",
-        startTime, startTime, sd, emptyParameters);
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
+    partition =
+        new Partition(Arrays.asList("tomorrow"), defaultDbName, tblName2, startTime, startTime, sd,
+            emptyParameters);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
       msClient.add_partition(partition);
+      fail("Error: add partition should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
   }
 
   @Test
   public void alterPartition() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "alterptn";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-    partCols.add(new FieldSchema("ds", "string", ""));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("alterparttable", "default", "me", startTime, startTime, 0, sd,
-        partCols, emptyParameters, null, null, null);
-    msClient.createTable(table);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    List<String> partCol1Vals = Arrays.asList("today");
+    partCols.add(partCol1);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+            emptyParameters, null, null, null);
 
-    Partition partition = new Partition(Arrays.asList("today"), "default", "alterparttable",
-        startTime, startTime, sd, emptyParameters);
+    // Event 1
+    msClient.createTable(table);
+    Partition partition =
+        new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+            emptyParameters);
+    // Event 2
     msClient.add_partition(partition);
+    Partition newPart =
+        new Partition(Arrays.asList("today"), defaultDbName, tblName, startTime, startTime + 1, sd,
+            emptyParameters);
+    // Event 3
+    msClient.alter_partition(defaultDbName, tblName, newPart, null);
 
-    Partition newPart = new Partition(Arrays.asList("today"), "default", "alterparttable",
-        startTime, startTime + 1, sd, emptyParameters);
-    msClient.alter_partition("default", "alterparttable", newPart, null);
-
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(2);
     assertEquals(firstEventId + 3, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("alterparttable", event.getTableName());
+    assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
 
     // Parse the message field
     ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
-    assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, jsonTree.get("eventType").asText());
-    assertEquals("default", jsonTree.get("db").asText());
-    assertEquals("alterparttable", jsonTree.get("table").asText());
+    assertEquals(EventType.ALTER_PARTITION.toString(), jsonTree.get("eventType").asText());
+    assertEquals(defaultDbName, jsonTree.get("db").asText());
+    assertEquals(tblName, jsonTree.get("table").asText());
     List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
     assertEquals(newPart, partitionObjList.get(0));
 
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
-      msClient.alter_partition("default", "alterparttable", newPart, null);
+      msClient.alter_partition(defaultDbName, tblName, newPart, null);
+      fail("Error: alter partition should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
   }
 
   @Test
   public void dropPartition() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "dropptn";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-    partCols.add(new FieldSchema("ds", "string", ""));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("dropPartTable", "default", "me", startTime, startTime, 0, sd, partCols,
-        emptyParameters, null, null, null);
-    msClient.createTable(table);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    List<String> partCol1Vals = Arrays.asList("today");
+    partCols.add(partCol1);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+            emptyParameters, null, null, null);
 
-    Partition partition = new Partition(Arrays.asList("today"), "default", "dropPartTable",
-        startTime, startTime, sd, emptyParameters);
+    // Event 1
+    msClient.createTable(table);
+    Partition partition =
+        new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+            emptyParameters);
+    // Event 2
     msClient.add_partition(partition);
+    // Event 3
+    msClient.dropPartition(defaultDbName, tblName, partCol1Vals, false);
 
-    msClient.dropPartition("default", "dropparttable", Arrays.asList("today"), false);
-
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
-
     NotificationEvent event = rsp.getEvents().get(2);
     assertEquals(firstEventId + 3, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("dropparttable", event.getTableName());
-    assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
-        "\"dropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
-
-    partition = new Partition(Arrays.asList("tomorrow"), "default", "dropPartTable",
-        startTime, startTime, sd, emptyParameters);
-      msClient.add_partition(partition);
+    assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
+
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    assertEquals(EventType.DROP_PARTITION.toString(), jsonTree.get("eventType").asText());
+    assertEquals(defaultDbName, jsonTree.get("db").asText());
+    assertEquals(tblName, jsonTree.get("table").asText());
+
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
+    List<String> newpartCol1Vals = Arrays.asList("tomorrow");
+    partition =
+        new Partition(newpartCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+            emptyParameters);
+    msClient.add_partition(partition);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
-      msClient.dropPartition("default", "dropparttable", Arrays.asList("tomorrow"), false);
+      msClient.dropPartition(defaultDbName, tblName, newpartCol1Vals, false);
+      fail("Error: drop partition should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(4, rsp.getEventsSize());
   }
 
   @Test
   public void createFunction() throws Exception {
-    String funcName = "createFunction";
-    String dbName = "default";
+    String defaultDbName = "default";
+    String funcName = "createfunction";
+    String funcName2 = "createfunction2";
     String ownerName = "me";
-    String funcClass = "o.a.h.h.myfunc";
+    String funcClass = "o.a.h.h.createfunc";
+    String funcClass2 = "o.a.h.h.createfunc2";
     String funcResource = "file:/tmp/somewhere";
-    Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER,
-        startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
-        funcResource)));
+    String funcResource2 = "file:/tmp/somewhere2";
+    Function func =
+        new Function(funcName, defaultDbName, funcClass, ownerName, PrincipalType.USER, startTime,
+            FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, funcResource)));
+    // Event 1
     msClient.createFunction(func);
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(0);
     assertEquals(firstEventId + 1, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_CREATE_FUNCTION_EVENT, event.getEventType());
-    assertEquals(dbName, event.getDbName());
+    assertEquals(EventType.CREATE_FUNCTION.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+
+    // Parse the message field
     Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
-    assertEquals(dbName, funcObj.getDbName());
+    assertEquals(defaultDbName, funcObj.getDbName());
     assertEquals(funcName, funcObj.getFunctionName());
     assertEquals(funcClass, funcObj.getClassName());
     assertEquals(ownerName, funcObj.getOwnerName());
@@ -479,41 +602,53 @@ public class TestDbNotificationListener {
     assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
     assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
 
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
-    func = new Function("createFunction2", dbName, "o.a.h.h.myfunc2", "me", PrincipalType.USER,
-        startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
-        "file:/tmp/somewhere2")));
+    func =
+        new Function(funcName2, defaultDbName, funcClass2, ownerName, PrincipalType.USER,
+            startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
+                funcResource2)));
     try {
       msClient.createFunction(func);
+      fail("Error: create function should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
   }
 
   @Test
   public void dropFunction() throws Exception {
-    String funcName = "dropfunctiontest";
-    String dbName = "default";
+    String defaultDbName = "default";
+    String funcName = "dropfunction";
+    String funcName2 = "dropfunction2";
     String ownerName = "me";
-    String funcClass = "o.a.h.h.dropFunctionTest";
+    String funcClass = "o.a.h.h.dropfunction";
+    String funcClass2 = "o.a.h.h.dropfunction2";
     String funcResource = "file:/tmp/somewhere";
-    Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER,
-        startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
-        funcResource)));
+    String funcResource2 = "file:/tmp/somewhere2";
+    Function func =
+        new Function(funcName, defaultDbName, funcClass, ownerName, PrincipalType.USER, startTime,
+            FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, funcResource)));
+    // Event 1
     msClient.createFunction(func);
-    msClient.dropFunction(dbName, funcName);
+    // Event 2
+    msClient.dropFunction(defaultDbName, funcName);
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_DROP_FUNCTION_EVENT, event.getEventType());
-    assertEquals(dbName, event.getDbName());
+    assertEquals(EventType.DROP_FUNCTION.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+
+    // Parse the message field
     Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
-    assertEquals(dbName, funcObj.getDbName());
+    assertEquals(defaultDbName, funcObj.getDbName());
     assertEquals(funcName, funcObj.getFunctionName());
     assertEquals(funcClass, funcObj.getClassName());
     assertEquals(ownerName, funcObj.getOwnerName());
@@ -522,17 +657,20 @@ public class TestDbNotificationListener {
     assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
     assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
 
-    func = new Function("dropfunctiontest2", dbName, "o.a.h.h.dropFunctionTest2", "me",
-        PrincipalType.USER,  startTime, FunctionType.JAVA, Arrays.asList(
-        new ResourceUri(ResourceType.JAR, "file:/tmp/somewhere2")));
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
+    func =
+        new Function(funcName2, defaultDbName, funcClass2, ownerName, PrincipalType.USER,
+            startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
+                funcResource2)));
     msClient.createFunction(func);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
-      msClient.dropFunction(dbName, "dropfunctiontest2");
+      msClient.dropFunction(defaultDbName, funcName2);
+      fail("Error: drop function should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
   }
@@ -543,46 +681,60 @@ public class TestDbNotificationListener {
     String dbName = "default";
     String tableName = "createIndexTable";
     String indexTableName = tableName + "__" + indexName + "__";
-    int startTime = (int)(System.currentTimeMillis() / 1000);
+    int startTime = (int) (System.currentTimeMillis() / 1000);
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
     cols.add(new FieldSchema("col1", "int", ""));
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
     Map<String, String> params = new HashMap<String, String>();
     params.put("key", "value");
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
-        serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
-    Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, serde,
+            Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+    Table table =
+        new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
+    // Event 1
     msClient.createTable(table);
-    Index index = new Index(indexName, null, "default", tableName, startTime, startTime,
-        indexTableName, sd, emptyParameters, false);
-    Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    Index index =
+        new Index(indexName, null, "default", tableName, startTime, startTime, indexTableName, sd,
+            emptyParameters, false);
+    Table indexTable =
+        new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
+    // Event 2, 3 (index table and index)
     msClient.createIndex(index, indexTable);
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(2);
     assertEquals(firstEventId + 3, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_CREATE_INDEX_EVENT, event.getEventType());
+    assertEquals(EventType.CREATE_INDEX.toString(), event.getEventType());
     assertEquals(dbName, event.getDbName());
+
+    // Parse the message field
     Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
     assertEquals(dbName, indexObj.getDbName());
     assertEquals(indexName, indexObj.getIndexName());
     assertEquals(tableName, indexObj.getOrigTableName());
     assertEquals(indexTableName, indexObj.getIndexTableName());
 
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
-    index = new Index("createIndexTable2", null, "default", tableName, startTime, startTime,
-        "createIndexTable2__createIndexTable2__", sd, emptyParameters, false);
-    Table indexTable2 = new Table("createIndexTable2__createIndexTable2__", dbName, "me",
-        startTime, startTime, 0, sd, null, emptyParameters, null, null, null);
+    index =
+        new Index("createIndexTable2", null, "default", tableName, startTime, startTime,
+            "createIndexTable2__createIndexTable2__", sd, emptyParameters, false);
+    Table indexTable2 =
+        new Table("createIndexTable2__createIndexTable2__", dbName, "me", startTime, startTime, 0,
+            sd, null, emptyParameters, null, null, null);
     try {
       msClient.createIndex(index, indexTable2);
+      fail("Error: create index should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
   }
@@ -593,44 +745,61 @@ public class TestDbNotificationListener {
     String dbName = "default";
     String tableName = "dropIndexTable";
     String indexTableName = tableName + "__" + indexName + "__";
-    int startTime = (int)(System.currentTimeMillis() / 1000);
+    int startTime = (int) (System.currentTimeMillis() / 1000);
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
     cols.add(new FieldSchema("col1", "int", ""));
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
     Map<String, String> params = new HashMap<String, String>();
     params.put("key", "value");
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
-        serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
-    Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, serde,
+            Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+    Table table =
+        new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
+    // Event 1
     msClient.createTable(table);
-    Index index = new Index(indexName, null, "default", tableName, startTime, startTime,
-        indexTableName, sd, emptyParameters, false);
-    Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    Index index =
+        new Index(indexName, null, "default", tableName, startTime, startTime, indexTableName, sd,
+            emptyParameters, false);
+    Table indexTable =
+        new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
+    // Event 2, 3 (index table and index)
     msClient.createIndex(index, indexTable);
-    msClient.dropIndex(dbName, tableName, indexName, true); // drops index and indexTable
+    // Event 4 (drops index and indexTable)
+    msClient.dropIndex(dbName, tableName, indexName, true);
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(4, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(3);
     assertEquals(firstEventId + 4, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_DROP_INDEX_EVENT, event.getEventType());
+    assertEquals(EventType.DROP_INDEX.toString(), event.getEventType());
     assertEquals(dbName, event.getDbName());
+
+    // Parse the message field
     Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
     assertEquals(dbName, indexObj.getDbName());
     assertEquals(indexName.toLowerCase(), indexObj.getIndexName());
     assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName());
     assertEquals(indexTableName.toLowerCase(), indexObj.getIndexTableName());
 
-    index = new Index("dropIndexTable2", null, "default", tableName, startTime, startTime,
-        "dropIndexTable__dropIndexTable2__", sd, emptyParameters, false);
-    Table indexTable2 = new Table("dropIndexTable__dropIndexTable2__", dbName, "me", startTime,
-        startTime, 0, sd, null, emptyParameters, null, null, null);
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
+    index =
+        new Index("dropIndexTable2", null, "default", tableName, startTime, startTime,
+            "dropIndexTable__dropIndexTable2__", sd, emptyParameters, false);
+    Table indexTable2 =
+        new Table("dropIndexTable__dropIndexTable2__", dbName, "me", startTime, startTime, 0, sd,
+            null, emptyParameters, null, null, null);
     msClient.createIndex(index, indexTable2);
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
-      msClient.dropIndex(dbName, tableName, "dropIndex2", true); // drops index and indexTable
+      // drops index and indexTable
+      msClient.dropIndex(dbName, tableName, "dropIndex2", true);
+      fail("Error: drop index should've failed");
     } catch (Exception ex) {
       // expected
     }
@@ -645,127 +814,165 @@ public class TestDbNotificationListener {
     String dbName = "default";
     String tableName = "alterIndexTable";
     String indexTableName = tableName + "__" + indexName + "__";
-    int startTime = (int)(System.currentTimeMillis() / 1000);
+    int startTime = (int) (System.currentTimeMillis() / 1000);
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
     cols.add(new FieldSchema("col1", "int", ""));
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
     Map<String, String> params = new HashMap<String, String>();
     params.put("key", "value");
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
-        serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
-    Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, serde,
+            Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+    Table table =
+        new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
+    // Event 1
     msClient.createTable(table);
-    Index oldIndex = new Index(indexName, null, "default", tableName, startTime, startTime,
-        indexTableName, sd, emptyParameters, false);
-    Table oldIndexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    Index oldIndex =
+        new Index(indexName, null, "default", tableName, startTime, startTime, indexTableName, sd,
+            emptyParameters, false);
+    Table oldIndexTable =
+        new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+            null, null, null);
+    // Event 2, 3
     msClient.createIndex(oldIndex, oldIndexTable); // creates index and index table
-    Index newIndex = new Index(indexName, null, "default", tableName, startTime, startTime + 1,
-        indexTableName, sd, emptyParameters, false);
+    Index newIndex =
+        new Index(indexName, null, "default", tableName, startTime, startTime + 1, indexTableName,
+            sd, emptyParameters, false);
+    // Event 4
     msClient.alter_index(dbName, tableName, indexName, newIndex);
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(4, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(3);
     assertEquals(firstEventId + 4, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_ALTER_INDEX_EVENT, event.getEventType());
+    assertEquals(EventType.ALTER_INDEX.toString(), event.getEventType());
     assertEquals(dbName, event.getDbName());
-    Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson");
+
+    // Parse the message field
+    Index indexObj =
+        JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson");
     assertEquals(dbName, indexObj.getDbName());
     assertEquals(indexName, indexObj.getIndexName());
     assertEquals(tableName, indexObj.getOrigTableName());
     assertEquals(indexTableName, indexObj.getIndexTableName());
     assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime());
 
+    // When hive.metastore.transactional.event.listeners is set,
+    // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
     try {
       msClient.alter_index(dbName, tableName, indexName, newIndex);
+      fail("Error: alter index should've failed");
     } catch (Exception ex) {
       // expected
     }
-
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(4, rsp.getEventsSize());
   }
 
   @Test
   public void insertTable() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "inserttbl";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    String fileAdded = "/warehouse/mytable/b1";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("insertTable", "default", "me", startTime, startTime, 0, sd, null,
-        emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+            emptyParameters, null, null, null);
+    // Event 1
     msClient.createTable(table);
 
     FireEventRequestData data = new FireEventRequestData();
     InsertEventRequestData insertData = new InsertEventRequestData();
     data.setInsertData(insertData);
-    insertData.addToFilesAdded("/warehouse/mytable/b1");
+    insertData.addToFilesAdded(fileAdded);
     FireEventRequest rqst = new FireEventRequest(true, data);
-    rqst.setDbName("default");
-    rqst.setTableName("insertTable");
+    rqst.setDbName(defaultDbName);
+    rqst.setTableName(tblName);
+    // Event 2
     msClient.fireListenerEvent(rqst);
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
-
     NotificationEvent event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("insertTable", event.getTableName());
-    assertTrue(event.getMessage(),
-        event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
-        "\"insertTable\",\"timestamp\":[0-9]+,\"files\":\\[\"/warehouse/mytable/b1\"]," +
-        "\"partKeyVals\":\\{},\"partitionKeyValues\":\\{}}"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
+    // Parse the message field
+    verifyInsertJSON(event, defaultDbName, tblName, false);
   }
 
   @Test
   public void insertPartition() throws Exception {
+    String defaultDbName = "default";
+    String tblName = "insertptn";
+    String tblOwner = "me";
+    String serdeLocation = "file:/tmp";
+    String fileAdded = "/warehouse/mytable/b1";
+    FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("col1", "int", "nocomment"));
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-    partCols.add(new FieldSchema("ds", "string", ""));
+    cols.add(col1);
     SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
-        serde, null, null, emptyParameters);
-    Table table = new Table("insertPartition", "default", "me", startTime, startTime, 0, sd,
-        partCols, emptyParameters, null, null, null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+            emptyParameters);
+    FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    List<String> partCol1Vals = Arrays.asList("today");
+    LinkedHashMap<String, String> partKeyVals = new LinkedHashMap<String, String>();
+    partKeyVals.put("ds", "today");
+    partCols.add(partCol1);
+    Table table =
+        new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+            emptyParameters, null, null, null);
+    // Event 1
     msClient.createTable(table);
-    Partition partition = new Partition(Arrays.asList("today"), "default", "insertPartition",
-        startTime, startTime, sd, emptyParameters);
+    Partition partition =
+        new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+            emptyParameters);
+    // Event 2
     msClient.add_partition(partition);
-
     FireEventRequestData data = new FireEventRequestData();
     InsertEventRequestData insertData = new InsertEventRequestData();
     data.setInsertData(insertData);
-    insertData.addToFilesAdded("/warehouse/mytable/today/b1");
+    insertData.addToFilesAdded(fileAdded);
     FireEventRequest rqst = new FireEventRequest(true, data);
-    rqst.setDbName("default");
-    rqst.setTableName("insertPartition");
-    rqst.setPartitionVals(Arrays.asList("today"));
+    rqst.setDbName(defaultDbName);
+    rqst.setTableName(tblName);
+    rqst.setPartitionVals(partCol1Vals);
+    // Event 3
     msClient.fireListenerEvent(rqst);
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
-
     NotificationEvent event = rsp.getEvents().get(2);
     assertEquals(firstEventId + 3, event.getEventId());
     assertTrue(event.getEventTime() >= startTime);
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    assertEquals("default", event.getDbName());
-    assertEquals("insertPartition", event.getTableName());
-    assertTrue(event.getMessage(),
-        event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," +
-        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
-        "\"insertPartition\",\"timestamp\":[0-9]+," +
-        "\"files\":\\[\"/warehouse/mytable/today/b1\"],\"partKeyVals\":\\{\"ds\":\"today\"}," +
-        "\"partitionKeyValues\":\\{\"ds\":\"today\"}}"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    assertEquals(defaultDbName, event.getDbName());
+    assertEquals(tblName, event.getTableName());
+    // Parse the message field
+    verifyInsertJSON(event, defaultDbName, tblName, false);
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    LinkedHashMap<String, String> partKeyValsFromNotif =
+        JSONMessageFactory.getAsMap((ObjectNode) jsonTree.get("partKeyVals"),
+            new LinkedHashMap<String, String>());
+    assertEquals(partKeyVals, partKeyValsFromNotif);
   }
 
   @Test
@@ -777,6 +984,7 @@ public class TestDbNotificationListener {
     db = new Database("db3", "no description", "file:/tmp", emptyParameters);
     msClient.createDatabase(db);
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 2, null);
     assertEquals(2, rsp.getEventsSize());
     assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
@@ -794,10 +1002,11 @@ public class TestDbNotificationListener {
     IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
       @Override
       public boolean accept(NotificationEvent event) {
-        return event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT);
+        return event.getEventType().equals(EventType.DROP_DATABASE.toString());
       }
     };
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, filter);
     assertEquals(1, rsp.getEventsSize());
     assertEquals(firstEventId + 3, rsp.getEvents().get(0).getEventId());
@@ -814,10 +1023,11 @@ public class TestDbNotificationListener {
     IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
       @Override
       public boolean accept(NotificationEvent event) {
-        return event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT);
+        return event.getEventType().equals(EventType.CREATE_DATABASE.toString());
       }
     };
 
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 1, filter);
     assertEquals(1, rsp.getEventsSize());
     assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
@@ -825,161 +1035,223 @@ public class TestDbNotificationListener {
 
   @Test
   public void sqlInsertTable() throws Exception {
-
-    driver.run("create table sit (c int)");
-    driver.run("insert into table sit values (1)");
-    driver.run("alter table sit add columns (c2 int)");
-    driver.run("drop table sit");
-
+    String defaultDbName = "default";
+    String tblName = "sqlins";
+    // Event 1
+    driver.run("create table " + tblName + " (c int)");
+    // Event 2 (alter: marker stats event), 3 (insert), 4 (alter: stats update event)
+    driver.run("insert into table " + tblName + " values (1)");
+    // Event 5
+    driver.run("alter table " + tblName + " add columns (c2 int)");
+    // Event 6
+    driver.run("drop table " + tblName);
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-    // For reasons not clear to me there's an alter after the create table and one after the
-    // insert.  I think the one after the insert is a stats calculation.
     assertEquals(6, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(0);
     assertEquals(firstEventId + 1, event.getEventId());
-    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+    assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+
     event = rsp.getEvents().get(2);
     assertEquals(firstEventId + 3, event.getEventId());
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    // Make sure the files are listed in the insert
-    assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    // Parse the message field
+    verifyInsertJSON(event, defaultDbName, tblName, true);
+
     event = rsp.getEvents().get(4);
     assertEquals(firstEventId + 5, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
+    assertEquals(EventType.ALTER_TABLE.toString(), event.getEventType());
+
     event = rsp.getEvents().get(5);
     assertEquals(firstEventId + 6, event.getEventId());
-    assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
+    assertEquals(EventType.DROP_TABLE.toString(), event.getEventType());
   }
 
   @Test
   public void sqlCTAS() throws Exception {
+    String sourceTblName = "sqlctasins1";
+    String targetTblName = "sqlctasins2";
+    // Event 1
+    driver.run("create table " + sourceTblName + " (c int)");
+    // Event 2 (alter: marker stats event), 3 (insert), 4 (alter: stats update event)
+    driver.run("insert into table " + sourceTblName + " values (1)");
+    // Event 5, 6 (alter: stats update event)
+    driver.run("create table " + targetTblName + " as select c from " + sourceTblName);
 
-    driver.run("create table ctas_source (c int)");
-    driver.run("insert into table ctas_source values (1)");
-    driver.run("create table ctas_target as select c from ctas_source");
-
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-
     assertEquals(6, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(0);
     assertEquals(firstEventId + 1, event.getEventId());
-    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+    assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+
     event = rsp.getEvents().get(2);
     assertEquals(firstEventId + 3, event.getEventId());
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    // Make sure the files are listed in the insert
-    assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    // Parse the message field
+    verifyInsertJSON(event, null, sourceTblName, true);
+
     event = rsp.getEvents().get(4);
     assertEquals(firstEventId + 5, event.getEventId());
-    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+    assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
   }
 
   @Test
   public void sqlTempTable() throws Exception {
+    String tempTblName = "sqltemptbl";
+    driver.run("create temporary table " + tempTblName + "  (c int)");
+    driver.run("insert into table " + tempTblName + " values (1)");
 
-    LOG.info("XXX Starting temp table");
-    driver.run("create temporary table tmp1 (c int)");
-    driver.run("insert into table tmp1 values (1)");
-
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-
     assertEquals(0, rsp.getEventsSize());
   }
 
   @Test
   public void sqlDb() throws Exception {
+    String dbName = "sqldb";
+    // Event 1
+    driver.run("create database " + dbName);
+    // Event 2
+    driver.run("drop database " + dbName);
 
-    driver.run("create database sd");
-    driver.run("drop database sd");
-
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(0);
     assertEquals(firstEventId + 1, event.getEventId());
-    assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType());
+    assertEquals(EventType.CREATE_DATABASE.toString(), event.getEventType());
     event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
-    assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
+    assertEquals(EventType.DROP_DATABASE.toString(), event.getEventType());
   }
 
   @Test
   public void sqlInsertPartition() throws Exception {
-
-    driver.run("create table sip (c int) partitioned by (ds string)");
-    driver.run("insert into table sip partition (ds = 'today') values (1)");
-    driver.run("insert into table sip partition (ds = 'today') values (2)");
-    driver.run("insert into table sip partition (ds) values (3, 'today')");
-    driver.run("alter table sip add partition (ds = 'yesterday')");
-    driver.run("insert into table sip partition (ds = 'yesterday') values (2)");
-
-    driver.run("insert into table sip partition (ds) values (3, 'yesterday')");
-    driver.run("insert into table sip partition (ds) values (3, 'tomorrow')");
-    driver.run("alter table sip drop partition (ds = 'tomorrow')");
-
-    driver.run("insert into table sip partition (ds) values (42, 'todaytwo')");
-    driver.run("insert overwrite table sip partition(ds='todaytwo') select c from sip where 'ds'='today'");
-
+    String tblName = "sqlinsptn";
+    // Event 1
+    driver.run("create table " + tblName + " (c int) partitioned by (ds string)");
+    // Event 2, 3, 4
+    driver.run("insert into table " + tblName + " partition (ds = 'today') values (1)");
+    // Event 5, 6, 7
+    driver.run("insert into table " + tblName + " partition (ds = 'today') values (2)");
+    // Event 8, 9, 10
+    driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')");
+    // Event 9, 10
+    driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");
+    // Event 10, 11, 12
+    driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)");
+    // Event 12, 13, 14
+    driver.run("insert into table " + tblName + " partition (ds) values (3, 'yesterday')");
+    // Event 15, 16, 17
+    driver.run("insert into table " + tblName + " partition (ds) values (3, 'tomorrow')");
+    // Event 18
+    driver.run("alter table " + tblName + " drop partition (ds = 'tomorrow')");
+    // Event 19, 20, 21
+    driver.run("insert into table " + tblName + " partition (ds) values (42, 'todaytwo')");
+    // Event 22, 23, 24
+    driver.run("insert overwrite table " + tblName + " partition(ds='todaytwo') select c from "
+        + tblName + " where 'ds'='today'");
+
+    // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-
-    for (NotificationEvent ne : rsp.getEvents()) LOG.debug("EVENT: " + ne.getMessage());
-    // For reasons not clear to me there's one or more alter partitions after add partition and
-    // insert.
     assertEquals(24, rsp.getEventsSize());
     NotificationEvent event = rsp.getEvents().get(1);
     assertEquals(firstEventId + 2, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
     event = rsp.getEvents().get(3);
     assertEquals(firstEventId + 4, event.getEventId());
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    // Make sure the files are listed in the insert
-    assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    // Parse the message field
+    verifyInsertJSON(event, null, tblName, true);
+
     event = rsp.getEvents().get(6);
     assertEquals(firstEventId + 7, event.getEventId());
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    // Parse the message field
+    verifyInsertJSON(event, null, tblName, true);
+
     event = rsp.getEvents().get(9);
     assertEquals(firstEventId + 10, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
     event = rsp.getEvents().get(10);
     assertEquals(firstEventId + 11, event.getEventId());
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    // Parse the message field
+    verifyInsertJSON(event, null, tblName, true);
+
     event = rsp.getEvents().get(13);
     assertEquals(firstEventId + 14, event.getEventId());
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    // Parse the message field
+    verifyInsertJSON(event, null, tblName, true);
+
     event = rsp.getEvents().get(16);
     assertEquals(firstEventId + 17, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
     event = rsp.getEvents().get(18);
     assertEquals(firstEventId + 19, event.getEventId());
-    assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType());
 
     event = rsp.getEvents().get(19);
     assertEquals(firstEventId + 20, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
     event = rsp.getEvents().get(20);
     assertEquals(firstEventId + 21, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
     assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
 
     event = rsp.getEvents().get(21);
     assertEquals(firstEventId + 22, event.getEventId());
-    assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
-    assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*")); // replace-overwrite introduces no new files
+    assertEquals(EventType.INSERT.toString(), event.getEventType());
+    // replace-overwrite introduces no new files
+    assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*"));
+
     event = rsp.getEvents().get(22);
     assertEquals(firstEventId + 23, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
     assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
+
     event = rsp.getEvents().get(23);
     assertEquals(firstEventId + 24, event.getEventId());
-    assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
+    assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
     assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
-   }
+  }
+
+  private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName,
+      boolean verifyChecksums) throws Exception {
+    // Parse the message field
+    ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+    System.out.println("JSONInsertMessage: " + jsonTree.toString());
+    assertEquals(EventType.INSERT.toString(), jsonTree.get("eventType").asText());
+    if (dbName != null) {
+      assertEquals(dbName, jsonTree.get("db").asText());
+    }
+    if (tblName != null) {
+      assertEquals(tblName, jsonTree.get("table").asText());
+    }
+    // Should have list of files
+    List<String> files =
+        JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("files"), new ArrayList<String>());
+    assertTrue(files.size() > 0);
+    if (verifyChecksums) {
+      // Should have list of file checksums
+      List<String> fileChecksums =
+          JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("fileChecksums"),
+              new ArrayList<String>());
+      assertTrue(fileChecksums.size() > 0);
+
+    }
+  }
 
   @Test
   public void cleanupNotifs() throws Exception {
-    Database db = new Database("cleanup1","no description","file:/tmp", emptyParameters);
+    Database db = new Database("cleanup1", "no description", "file:/tmp", emptyParameters);
     msClient.createDatabase(db);
     msClient.dropDatabase("cleanup1");
 
@@ -988,7 +1260,8 @@ public class TestDbNotificationListener {
     assertEquals(2, rsp.getEventsSize());
 
     // sleep for expiry time, and then fetch again
-    Thread.sleep(EVENTS_TTL * 2 * 1000); // sleep twice the TTL interval - things should have been cleaned by then.
+    // sleep twice the TTL interval - things should have been cleaned by then.
+    Thread.sleep(EVENTS_TTL * 2 * 1000);
 
     LOG.info("Pulling events again after cleanup");
     NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index 6f77156..79592ea 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -811,7 +811,9 @@ struct CurrentNotificationEventId {
 }
 
 struct InsertEventRequestData {
-    1: required list<string> filesAdded
+    1: required list<string> filesAdded,
+    // Checksum of files (UTF8 encoded string) added during this insert event (at the time they were added)
+    2: optional list<binary> filesAddedChecksum,
 }
 
 union FireEventRequestData {