You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:05 UTC

[55/75] [abbrv] hive git commit: HIVE-20679: DDL operations on hive might create large messages for DBNotification (Anishek Agarwal, reviewed by Sankar Hariappan)

HIVE-20679: DDL operations on hive might create large messages for DBNotification (Anishek Agarwal, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b4302bb7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b4302bb7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b4302bb7

Branch: refs/heads/master-tez092
Commit: b4302bb7ad967f15ca1b708685b2ac669e3cf037
Parents: b829955
Author: Anishek Agarwal <an...@gmail.com>
Authored: Mon Oct 22 13:51:43 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Mon Oct 22 13:51:43 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../listener/DbNotificationListener.java        | 182 +++++---
 .../json/JSONCreateFunctionMessage.java         |   3 +-
 .../messaging/json/JSONDropFunctionMessage.java |   3 +-
 .../messaging/json/JSONMessageFactory.java      |  39 +-
 .../listener/TestDbNotificationListener.java    |  14 +-
 .../TestReplAcidTablesWithJsonMessage.java      |  43 ++
 ...eplAcrossInstancesWithJsonMessageFormat.java |  45 ++
 ...ncrementalLoadAcidTablesWithJsonMessage.java |  46 ++
 .../ql/parse/TestReplWithJsonMessageFormat.java |  39 ++
 .../hive/ql/parse/TestReplicationScenarios.java |  82 ++--
 .../TestReplicationScenariosAcidTables.java     |  61 +--
 ...TestReplicationScenariosAcrossInstances.java | 103 +++--
 ...ationScenariosIncrementalLoadAcidTables.java |  55 ++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java |   2 +-
 .../ql/cache/results/QueryResultsCache.java     |  14 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   5 +-
 .../repl/bootstrap/load/LoadConstraint.java     |   4 +-
 .../parse/repl/dump/events/AbortTxnHandler.java |  12 +-
 .../events/AbstractConstraintEventHandler.java  |   3 +-
 .../repl/dump/events/AbstractEventHandler.java  |  32 +-
 .../repl/dump/events/AddForeignKeyHandler.java  |  12 +-
 .../events/AddNotNullConstraintHandler.java     |  12 +-
 .../repl/dump/events/AddPartitionHandler.java   |  10 +-
 .../repl/dump/events/AddPrimaryKeyHandler.java  |  12 +-
 .../dump/events/AddUniqueConstraintHandler.java |  13 +-
 .../repl/dump/events/AllocWriteIdHandler.java   |  12 +-
 .../repl/dump/events/AlterDatabaseHandler.java  |  12 +-
 .../repl/dump/events/AlterPartitionHandler.java |  21 +-
 .../repl/dump/events/AlterTableHandler.java     |  18 +-
 .../repl/dump/events/CommitTxnHandler.java      |  28 +-
 .../repl/dump/events/CreateDatabaseHandler.java |  13 +-
 .../repl/dump/events/CreateFunctionHandler.java |  13 +-
 .../repl/dump/events/CreateTableHandler.java    |  15 +-
 .../parse/repl/dump/events/DefaultHandler.java  |   9 +
 .../repl/dump/events/DropConstraintHandler.java |  13 +-
 .../repl/dump/events/DropDatabaseHandler.java   |  12 +-
 .../repl/dump/events/DropFunctionHandler.java   |  12 +-
 .../repl/dump/events/DropPartitionHandler.java  |  12 +-
 .../repl/dump/events/DropTableHandler.java      |  12 +-
 .../repl/dump/events/EventHandlerFactory.java   |  44 +-
 .../parse/repl/dump/events/InsertHandler.java   |  22 +-
 .../parse/repl/dump/events/OpenTxnHandler.java  |  12 +-
 .../repl/dump/io/ConstraintsSerializer.java     |  10 +-
 .../load/message/AbstractMessageHandler.java    |   4 +-
 .../dump/events/TestEventHandlerFactory.java    |   7 +-
 .../hive/metastore/conf/MetastoreConf.java      |   2 +-
 .../hive/metastore/messaging/EventMessage.java  |  64 +--
 .../metastore/messaging/MessageBuilder.java     | 425 ++++++++++++++++++
 .../metastore/messaging/MessageEncoder.java     |  27 ++
 .../metastore/messaging/MessageFactory.java     | 367 +++-------------
 .../metastore/messaging/MessageSerializer.java  |  24 ++
 .../event/filters/DatabaseAndTableFilter.java   |   8 +-
 .../messaging/json/JSONAcidWriteMessage.java    |   9 +-
 .../json/JSONAddForeignKeyMessage.java          |   5 +-
 .../json/JSONAddNotNullConstraintMessage.java   |   5 +-
 .../messaging/json/JSONAddPartitionMessage.java |  11 +-
 .../json/JSONAddPrimaryKeyMessage.java          |   5 +-
 .../json/JSONAddUniqueConstraintMessage.java    |   5 +-
 .../messaging/json/JSONAlterCatalogMessage.java |   9 +-
 .../json/JSONAlterDatabaseMessage.java          |   9 +-
 .../json/JSONAlterPartitionMessage.java         |  15 +-
 .../messaging/json/JSONAlterTableMessage.java   |   9 +-
 .../messaging/json/JSONCommitTxnMessage.java    |   5 +-
 .../json/JSONCreateDatabaseMessage.java         |   5 +-
 .../json/JSONCreateFunctionMessage.java         |   5 +-
 .../messaging/json/JSONCreateTableMessage.java  |   5 +-
 .../json/JSONDropPartitionMessage.java          |   5 +-
 .../messaging/json/JSONDropTableMessage.java    |   5 +-
 .../messaging/json/JSONInsertMessage.java       |   9 +-
 .../messaging/json/JSONMessageEncoder.java      |  70 +++
 .../messaging/json/JSONMessageFactory.java      | 432 -------------------
 .../messaging/json/gzip/DeSerializer.java       | 181 ++++++++
 .../json/gzip/GzipJSONMessageEncoder.java       |  68 +++
 .../messaging/json/gzip/Serializer.java         |  32 ++
 .../hive/metastore/MetaStoreTestUtils.java      |  11 +
 .../ptest2/conf/deployed/master-mr2.properties  |   2 +-
 77 files changed, 1781 insertions(+), 1202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bcf1e9e..ed6d3d8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1176,7 +1176,7 @@ public class HiveConf extends Configuration {
      */
     @Deprecated
     METASTORE_EVENT_MESSAGE_FACTORY("hive.metastore.event.message.factory",
-        "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory",
+        "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder",
         "Factory class for making encoding and decoding messages in the events generated."),
     /**
      * @deprecated Use MetastoreConf.EXECUTE_SET_UGI

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 4313e12..c23aab2 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -78,9 +78,32 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
 import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
 import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
@@ -110,7 +133,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   private static CleanerThread cleaner = null;
 
   private Configuration conf;
-  private MessageFactory msgFactory;
+  private MessageEncoder msgEncoder;
 
   //cleaner is a static object, use static synchronized to make sure its thread-safe
   private static synchronized void init(Configuration conf) throws MetaException {
@@ -126,7 +149,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     super(config);
     conf = config;
     DbNotificationListener.init(conf);
-    msgFactory = MessageFactory.getInstance();
+    msgEncoder = MessageFactory.getDefaultInstance(conf);
   }
 
   /**
@@ -172,9 +195,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     Table t = tableEvent.getTable();
     FileIterator fileIter = MetaStoreUtils.isExternalTable(t)
                               ? null : new FileIterator(t.getSd().getLocation());
+    CreateTableMessage msg =
+        MessageBuilder.getInstance().buildCreateTableMessage(t, fileIter);
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(),
-                msgFactory.buildCreateTableMessage(t, fileIter).toString());
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
@@ -188,9 +213,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onDropTable(DropTableEvent tableEvent) throws MetaException {
     Table t = tableEvent.getTable();
+    DropTableMessage msg = MessageBuilder.getInstance().buildDropTableMessage(t);
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory
-            .buildDropTableMessage(t).toString());
+        new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
@@ -205,9 +231,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
     Table before = tableEvent.getOldTable();
     Table after = tableEvent.getNewTable();
+    AlterTableMessage msg = MessageBuilder.getInstance()
+        .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(),
+            tableEvent.getWriteId());
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
-            .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(), tableEvent.getWriteId()).toString());
+        new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(),
+            msgEncoder.getSerializer().serialize(msg)
+        );
     event.setCatName(after.isSetCatName() ? after.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(after.getDbName());
     event.setTableName(after.getTableName());
@@ -320,10 +350,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     Table t = partitionEvent.getTable();
     PartitionFilesIterator fileIter = MetaStoreUtils.isExternalTable(t)
             ? null : new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t);
-    String msg = msgFactory
-        .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter).toString();
-    NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
+    EventMessage msg = MessageBuilder.getInstance()
+        .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter);
+    MessageSerializer serializer = msgEncoder.getSerializer();
+
+    NotificationEvent event = new NotificationEvent(0, now(),
+        EventType.ADD_PARTITION.toString(), serializer.serialize(msg));
     event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
@@ -337,9 +369,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
     Table t = partitionEvent.getTable();
-    NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory
-            .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
+    DropPartitionMessage msg =
+        MessageBuilder.getInstance()
+            .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator());
+    NotificationEvent event = new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(),
+        msgEncoder.getSerializer().serialize(msg));
     event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
@@ -354,10 +388,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
     Partition before = partitionEvent.getOldPartition();
     Partition after = partitionEvent.getNewPartition();
+    AlterPartitionMessage msg = MessageBuilder.getInstance()
+        .buildAlterPartitionMessage(partitionEvent.getTable(), before, after,
+            partitionEvent.getIsTruncateOp(),
+            partitionEvent.getWriteId());
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
-            .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp(),
-                    partitionEvent.getWriteId()).toString());
+        new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(before.isSetCatName() ? before.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(before.getDbName());
     event.setTableName(before.getTableName());
@@ -371,9 +408,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
     Database db = dbEvent.getDatabase();
+    CreateDatabaseMessage msg = MessageBuilder.getInstance()
+        .buildCreateDatabaseMessage(db);
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
-            .buildCreateDatabaseMessage(db).toString());
+        new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(db.isSetCatalogName() ? db.getCatalogName() : DEFAULT_CATALOG_NAME);
     event.setDbName(db.getName());
     process(event, dbEvent);
@@ -386,9 +425,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
     Database db = dbEvent.getDatabase();
+    DropDatabaseMessage msg = MessageBuilder.getInstance()
+        .buildDropDatabaseMessage(db);
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
-            .buildDropDatabaseMessage(db).toString());
+        new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(db.isSetCatalogName() ? db.getCatalogName() : DEFAULT_CATALOG_NAME);
     event.setDbName(db.getName());
     process(event, dbEvent);
@@ -402,9 +443,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
     Database oldDb = dbEvent.getOldDatabase();
     Database newDb = dbEvent.getNewDatabase();
+    AlterDatabaseMessage msg = MessageBuilder.getInstance()
+        .buildAlterDatabaseMessage(oldDb, newDb);
     NotificationEvent event =
-            new NotificationEvent(0, now(), EventType.ALTER_DATABASE.toString(), msgFactory
-                    .buildAlterDatabaseMessage(oldDb, newDb).toString());
+        new NotificationEvent(0, now(), EventType.ALTER_DATABASE.toString(),
+            msgEncoder.getSerializer().serialize(msg)
+        );
     event.setCatName(oldDb.isSetCatalogName() ? oldDb.getCatalogName() : DEFAULT_CATALOG_NAME);
     event.setDbName(oldDb.getName());
     process(event, dbEvent);
@@ -417,9 +461,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException {
     Function fn = fnEvent.getFunction();
+    CreateFunctionMessage msg = MessageBuilder.getInstance()
+        .buildCreateFunctionMessage(fn);
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
-            .buildCreateFunctionMessage(fn).toString());
+        new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(fn.isSetCatName() ? fn.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(fn.getDbName());
     process(event, fnEvent);
@@ -432,9 +478,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException {
     Function fn = fnEvent.getFunction();
+    DropFunctionMessage msg = MessageBuilder.getInstance().buildDropFunctionMessage(fn);
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
-            .buildDropFunctionMessage(fn).toString());
+        new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(fn.isSetCatName() ? fn.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(fn.getDbName());
     process(event, fnEvent);
@@ -481,11 +528,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onInsert(InsertEvent insertEvent) throws MetaException {
     Table tableObj = insertEvent.getTableObj();
+    InsertMessage msg = MessageBuilder.getInstance().buildInsertMessage(tableObj,
+        insertEvent.getPartitionObj(), insertEvent.isReplace(),
+        new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()));
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj,
-                insertEvent.getPartitionObj(), insertEvent.isReplace(),
-            new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
-                .toString());
+        new NotificationEvent(0, now(), EventType.INSERT.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(tableObj.isSetCatName() ? tableObj.getCatName() : DEFAULT_CATALOG_NAME);
     event.setDbName(tableObj.getDbName());
     event.setTableName(tableObj.getTableName());
@@ -495,10 +543,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException {
     int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1;
-    OpenTxnMessage msg = msgFactory.buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
+    OpenTxnMessage msg =
+        MessageBuilder.getInstance().buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
             openTxnEvent.getTxnIds().get(lastTxnIdx));
     NotificationEvent event =
-            new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), msg.toString());
+        new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(),
+            msgEncoder.getSerializer().serialize(msg));
 
     try {
       addNotificationLog(event, openTxnEvent, dbConn, sqlGenerator);
@@ -510,10 +560,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
           throws MetaException {
+    CommitTxnMessage msg =
+        MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId());
+
     NotificationEvent event =
-            new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), msgFactory.buildCommitTxnMessage(
-                    commitTxnEvent.getTxnId())
-                    .toString());
+        new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(),
+            msgEncoder.getSerializer().serialize(msg));
 
     try {
       addNotificationLog(event, commitTxnEvent, dbConn, sqlGenerator);
@@ -525,10 +577,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
           throws MetaException {
+    AbortTxnMessage msg =
+        MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId());
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), msgFactory.buildAbortTxnMessage(
-            abortTxnEvent.getTxnId())
-            .toString());
+        new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(),
+            msgEncoder.getSerializer().serialize(msg));
 
     try {
       addNotificationLog(event, abortTxnEvent, dbConn, sqlGenerator);
@@ -555,9 +608,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onAddPrimaryKey(AddPrimaryKeyEvent addPrimaryKeyEvent) throws MetaException {
     List<SQLPrimaryKey> cols = addPrimaryKeyEvent.getPrimaryKeyCols();
     if (cols.size() > 0) {
-      NotificationEvent event =
-          new NotificationEvent(0, now(), EventType.ADD_PRIMARYKEY.toString(), msgFactory
-              .buildAddPrimaryKeyMessage(addPrimaryKeyEvent.getPrimaryKeyCols()).toString());
+      AddPrimaryKeyMessage msg = MessageBuilder.getInstance()
+          .buildAddPrimaryKeyMessage(addPrimaryKeyEvent.getPrimaryKeyCols());
+      NotificationEvent event = new NotificationEvent(0, now(), EventType.ADD_PRIMARYKEY.toString(),
+          msgEncoder.getSerializer().serialize(msg));
       event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
       event.setDbName(cols.get(0).getTable_db());
       event.setTableName(cols.get(0).getTable_name());
@@ -573,9 +627,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onAddForeignKey(AddForeignKeyEvent addForeignKeyEvent) throws MetaException {
     List<SQLForeignKey> cols = addForeignKeyEvent.getForeignKeyCols();
     if (cols.size() > 0) {
+      AddForeignKeyMessage msg = MessageBuilder.getInstance()
+          .buildAddForeignKeyMessage(addForeignKeyEvent.getForeignKeyCols());
       NotificationEvent event =
-          new NotificationEvent(0, now(), EventType.ADD_FOREIGNKEY.toString(), msgFactory
-              .buildAddForeignKeyMessage(addForeignKeyEvent.getForeignKeyCols()).toString());
+          new NotificationEvent(0, now(), EventType.ADD_FOREIGNKEY.toString(),
+              msgEncoder.getSerializer().serialize(msg));
       event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
       event.setDbName(cols.get(0).getPktable_db());
       event.setTableName(cols.get(0).getPktable_name());
@@ -591,9 +647,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onAddUniqueConstraint(AddUniqueConstraintEvent addUniqueConstraintEvent) throws MetaException {
     List<SQLUniqueConstraint> cols = addUniqueConstraintEvent.getUniqueConstraintCols();
     if (cols.size() > 0) {
+      AddUniqueConstraintMessage msg = MessageBuilder.getInstance()
+          .buildAddUniqueConstraintMessage(addUniqueConstraintEvent.getUniqueConstraintCols());
       NotificationEvent event =
-          new NotificationEvent(0, now(), EventType.ADD_UNIQUECONSTRAINT.toString(), msgFactory
-              .buildAddUniqueConstraintMessage(addUniqueConstraintEvent.getUniqueConstraintCols()).toString());
+          new NotificationEvent(0, now(), EventType.ADD_UNIQUECONSTRAINT.toString(),
+              msgEncoder.getSerializer().serialize(msg)
+          );
       event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
       event.setDbName(cols.get(0).getTable_db());
       event.setTableName(cols.get(0).getTable_name());
@@ -609,9 +668,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onAddNotNullConstraint(AddNotNullConstraintEvent addNotNullConstraintEvent) throws MetaException {
     List<SQLNotNullConstraint> cols = addNotNullConstraintEvent.getNotNullConstraintCols();
     if (cols.size() > 0) {
+      AddNotNullConstraintMessage msg = MessageBuilder.getInstance()
+          .buildAddNotNullConstraintMessage(addNotNullConstraintEvent.getNotNullConstraintCols());
       NotificationEvent event =
-          new NotificationEvent(0, now(), EventType.ADD_NOTNULLCONSTRAINT.toString(), msgFactory
-              .buildAddNotNullConstraintMessage(addNotNullConstraintEvent.getNotNullConstraintCols()).toString());
+          new NotificationEvent(0, now(), EventType.ADD_NOTNULLCONSTRAINT.toString(),
+              msgEncoder.getSerializer().serialize(msg)
+          );
       event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
       event.setDbName(cols.get(0).getTable_db());
       event.setTableName(cols.get(0).getTable_name());
@@ -628,9 +690,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     String dbName = dropConstraintEvent.getDbName();
     String tableName = dropConstraintEvent.getTableName();
     String constraintName = dropConstraintEvent.getConstraintName();
+    DropConstraintMessage msg = MessageBuilder.getInstance()
+        .buildDropConstraintMessage(dbName, tableName, constraintName);
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(), msgFactory
-            .buildDropConstraintMessage(dbName, tableName, constraintName).toString());
+        new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(),
+            msgEncoder.getSerializer().serialize(msg));
     event.setCatName(dropConstraintEvent.getCatName());
     event.setDbName(dbName);
     event.setTableName(tableName);
@@ -646,9 +710,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
           throws MetaException {
     String tableName = allocWriteIdEvent.getTableName();
     String dbName = allocWriteIdEvent.getDbName();
+    AllocWriteIdMessage msg = MessageBuilder.getInstance()
+        .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName);
     NotificationEvent event =
-            new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(), msgFactory
-                    .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName).toString());
+        new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(),
+            msgEncoder.getSerializer().serialize(msg)
+        );
     event.setTableName(tableName);
     event.setDbName(dbName);
     try {
@@ -661,11 +728,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onAcidWrite(AcidWriteEvent acidWriteEvent, Connection dbConn, SQLGenerator sqlGenerator)
           throws MetaException {
-    AcidWriteMessage msg = msgFactory.buildAcidWriteMessage(acidWriteEvent,
+    AcidWriteMessage msg = MessageBuilder.getInstance().buildAcidWriteMessage(acidWriteEvent,
             new FileChksumIterator(acidWriteEvent.getFiles(), acidWriteEvent.getChecksums(),
                     acidWriteEvent.getSubDirs()));
-    NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(), msg.toString());
-    event.setMessageFormat(msgFactory.getMessageFormat());
+    NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(),
+        msgEncoder.getSerializer().serialize(msg));
+    event.setMessageFormat(msgEncoder.getMessageFormat());
     event.setDbName(acidWriteEvent.getDatabase());
     event.setTableName(acidWriteEvent.getTable());
     try {
@@ -848,7 +916,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
-      event.setMessageFormat(msgFactory.getMessageFormat());
+      event.setMessageFormat(msgEncoder.getMessageFormat());
 
       if (sqlGenerator.getDbProduct() == MYSQL) {
         stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
@@ -910,7 +978,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
    *                      DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
    */
   private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
-    event.setMessageFormat(msgFactory.getMessageFormat());
+    event.setMessageFormat(msgEncoder.getMessageFormat());
     LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
         event.getMessage());
     HMSHandler.getMSForConf(conf).addNotificationEvent(event);

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
index 4707d0e..17d3b73 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
@@ -20,6 +20,7 @@
 package org.apache.hive.hcatalog.messaging.json;
 
 import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hive.hcatalog.messaging.CreateFunctionMessage;
 import org.apache.thrift.TException;
 import org.codehaus.jackson.annotate.JsonProperty;
@@ -46,7 +47,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage {
     this.db = fn.getDbName();
     this.timestamp = timestamp;
     try {
-      this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+      this.functionObjJson = MessageBuilder.createFunctionObjJson(fn);
     } catch (TException ex) {
       throw new IllegalArgumentException("Could not serialize Function object", ex);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
index 010c4a6..7fb7d1c 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
@@ -20,6 +20,7 @@
 package org.apache.hive.hcatalog.messaging.json;
 
 import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hive.hcatalog.messaging.DropFunctionMessage;
 import org.apache.thrift.TException;
 import org.codehaus.jackson.annotate.JsonProperty;
@@ -46,7 +47,7 @@ public class JSONDropFunctionMessage extends DropFunctionMessage {
     this.db = fn.getDbName();
     this.timestamp = timestamp;
     try {
-      this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+      this.functionObjJson = MessageBuilder.createFunctionObjJson(fn);
     } catch (TException ex) {
       throw new IllegalArgumentException("Could not serialize Function object", ex);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
index ec573a3..770dd1e 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
@@ -20,16 +20,14 @@
 package org.apache.hive.hcatalog.messaging.json;
 
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
 import org.apache.hive.hcatalog.messaging.AlterPartitionMessage;
 import org.apache.hive.hcatalog.messaging.AlterTableMessage;
@@ -43,15 +41,9 @@ import org.apache.hive.hcatalog.messaging.DropTableMessage;
 import org.apache.hive.hcatalog.messaging.InsertMessage;
 import org.apache.hive.hcatalog.messaging.MessageDeserializer;
 import org.apache.hive.hcatalog.messaging.MessageFactory;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
 /**
  * The JSON implementation of the MessageFactory. Constructs JSON implementations of
  * each message-type.
@@ -111,7 +103,7 @@ public class JSONMessageFactory extends MessageFactory {
   public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator<Partition> partitionsIterator) {
     return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
         table.getTableName(), table.getTableType(),
-        getPartitionKeyValues(table, partitionsIterator), now());
+        MessageBuilder.getPartitionKeyValues(table, partitionsIterator), now());
   }
 
   @Override
@@ -119,14 +111,14 @@ public class JSONMessageFactory extends MessageFactory {
                                                           Long writeId) {
     return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL,
         before.getDbName(), before.getTableName(), table.getTableType(),
-        getPartitionKeyValues(table,before), writeId, now());
+        MessageBuilder.getPartitionKeyValues(table,before), writeId, now());
   }
 
   @Override
   public DropPartitionMessage buildDropPartitionMessage(Table table, Iterator<Partition> partitions) {
     return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
         table.getTableName(), table.getTableType(),
-        getPartitionKeyValues(table, partitions), now());
+        MessageBuilder.getPartitionKeyValues(table, partitions), now());
   }
 
   @Override
@@ -159,27 +151,4 @@ public class JSONMessageFactory extends MessageFactory {
     return System.currentTimeMillis() / 1000;
   }
 
-  private static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
-    Map<String, String> partitionKeys = new LinkedHashMap<String, String>();
-    for (int i=0; i<table.getPartitionKeysSize(); ++i) {
-      partitionKeys.put(table.getPartitionKeys().get(i).getName(),
-          partition.getValues().get(i));
-    }
-    return partitionKeys;
-  }
-
-  private static List<Map<String, String>> getPartitionKeyValues(final Table table, Iterator<Partition> iterator) {
-    return Lists.newArrayList(Iterators.transform(iterator, new com.google.common.base.Function<Partition, Map<String, String>>() {
-      @Override
-      public Map<String, String> apply(@Nullable Partition partition) {
-        return getPartitionKeyValues(table, partition);
-      }
-    }));
-  }
-
-  static String createFunctionObjJson(Function functionObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(functionObj, "UTF-8");
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index a00ea17..dc555a4 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -93,6 +93,7 @@ import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -119,7 +120,16 @@ public class TestDbNotificationListener {
   private static Map<String, String> emptyParameters = new HashMap<String, String>();
   private static IMetaStoreClient msClient;
   private static IDriver driver;
-  private static MessageDeserializer md = null;
+  private static MessageDeserializer md;
+
+  static {
+    try {
+      md = MessageFactory.getInstance(JSONMessageEncoder.FORMAT).getDeserializer();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private int startTime;
   private long firstEventId;
   private final String testTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "testDbNotif").toString();
@@ -267,7 +277,7 @@ public class TestDbNotificationListener {
     SessionState.start(new CliSessionState(conf));
     msClient = new HiveMetaStoreClient(conf);
     driver = DriverFactory.newDriver(conf);
-    md = MessageFactory.getInstance().getDeserializer();
+    md = JSONMessageEncoder.getInstance().getDeserializer();
 
     bcompat = new ReplicationV1CompatRule(msClient, conf, testsToSkipForReplV1BackwardCompatTesting );
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java
new file mode 100644
index 0000000..c16799d
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.Collections;
+
+public class TestReplAcidTablesWithJsonMessage extends TestReplicationScenariosAcidTables {
+
+  @Rule
+  public TestRule replV1BackwardCompat;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    internalBeforeClassSetup(Collections.emptyMap(), TestReplAcidTablesWithJsonMessage.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    replV1BackwardCompat = primary.getReplivationV1CompatRule(Collections.emptyList());
+    super.setup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
new file mode 100644
index 0000000..0ec0275
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class TestReplAcrossInstancesWithJsonMessageFormat
+    extends TestReplicationScenariosAcrossInstances {
+
+  @Rule
+  public TestRule replV1BackwardCompat;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    internalBeforeClassSetup(Collections.emptyMap(), TestReplicationScenarios.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
+    super.setup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java
new file mode 100644
index 0000000..792ec1c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.Collections;
+
+public class TestReplIncrementalLoadAcidTablesWithJsonMessage
+    extends TestReplicationScenariosIncrementalLoadAcidTables {
+
+  @Rule
+  public TestRule replV1BackwardCompat;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    internalBeforeClassSetup(Collections.emptyMap(),
+        TestReplIncrementalLoadAcidTablesWithJsonMessage.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    replV1BackwardCompat = primary.getReplivationV1CompatRule(Collections.emptyList());
+    super.setup();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
new file mode 100644
index 0000000..faf1ced
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class TestReplWithJsonMessageFormat extends TestReplicationScenarios {
+  @Rule
+  public TestRule replV1BackwardCompatibleRule =
+      new ReplicationV1CompatRule(metaStoreClient, hconf,
+          new ArrayList<>(Collections.singletonList("testEventFilters")));
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    internalBeforeClassSetup(Collections.emptyMap());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 9c35aa6..75cd68a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -18,11 +18,11 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
 import org.apache.hadoop.hive.metastore.ObjectStore;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -47,13 +46,18 @@ import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.MoveTask;
@@ -62,45 +66,42 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 
 import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
 
 public class TestReplicationScenarios {
 
@@ -115,18 +116,14 @@ public class TestReplicationScenarios {
   private final static String TEST_PATH =
       System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
 
-  private static HiveConf hconf;
+  static HiveConf hconf;
+  static HiveMetaStoreClient metaStoreClient;
   private static IDriver driver;
-  private static HiveMetaStoreClient metaStoreClient;
   private static String proxySettingName;
-  static HiveConf hconfMirror;
-  static IDriver driverMirror;
-  static HiveMetaStoreClient metaStoreClientMirror;
+  private static HiveConf hconfMirror;
+  private static IDriver driverMirror;
+  private static HiveMetaStoreClient metaStoreClientMirror;
 
-  @Rule
-  public TestRule replV1BackwardCompatibleRule =
-      new ReplicationV1CompatRule(metaStoreClient, hconf,
-          new ArrayList<>(Arrays.asList("testEventFilters")));
   // Make sure we skip backward-compat checking for those tests that don't generate events
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
@@ -141,23 +138,30 @@ public class TestReplicationScenarios {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    HashMap<String, String> overrideProperties = new HashMap<>();
+    overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    internalBeforeClassSetup(overrideProperties);
+  }
+
+  static void internalBeforeClassSetup(Map<String, String> additionalProperties)
+      throws Exception {
     hconf = new HiveConf(TestReplicationScenarios.class);
-    String metastoreUri = System.getProperty("test."+HiveConf.ConfVars.METASTOREURIS.varname);
+    String metastoreUri = System.getProperty("test."+MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
     if (metastoreUri != null) {
-      hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri);
+      hconf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
       return;
     }
 
-    hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
+    hconf.set(MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getHiveName(),
         DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
     hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
     hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
     hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
     proxySettingName = "hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts";
     hconf.set(proxySettingName, "*");
-    MetaStoreTestUtils.startMetaStoreWithRetry(hconf);
     hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
-    hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+    hconf.set(MetastoreConf.ConfVars.THRIFT_CONNECTION_RETRIES.getHiveName(), "3");
     hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
     hconf.set(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
@@ -166,11 +170,17 @@ public class TestReplicationScenarios {
     hconf.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
         "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
     hconf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname,
-              "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore");
+        "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore");
     hconf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true);
     System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
     System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
 
+    additionalProperties.forEach((key, value) -> {
+      hconf.set(key, value);
+    });
+
+    MetaStoreTestUtils.startMetaStoreWithRetry(hconf);
+
     Path testPath = new Path(TEST_PATH);
     FileSystem fs = FileSystem.get(testPath.toUri(),hconf);
     fs.mkdirs(testPath);
@@ -3077,12 +3087,12 @@ public class TestReplicationScenarios {
     // that match a provided message format
 
     IMetaStoreClient.NotificationFilter restrictByDefaultMessageFormat =
-        new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat());
+        new MessageFormatFilter(JSONMessageEncoder.FORMAT);
     IMetaStoreClient.NotificationFilter restrictByArbitraryMessageFormat =
-        new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat() + "_bogus");
+        new MessageFormatFilter(JSONMessageEncoder.FORMAT + "_bogus");
     NotificationEvent dummyEvent = createDummyEvent(dbname,tblname,0);
 
-    assertEquals(MessageFactory.getInstance().getMessageFormat(),dummyEvent.getMessageFormat());
+    assertEquals(JSONMessageEncoder.FORMAT,dummyEvent.getMessageFormat());
 
     assertFalse(restrictByDefaultMessageFormat.accept(null));
     assertTrue(restrictByDefaultMessageFormat.accept(dummyEvent));
@@ -3431,19 +3441,25 @@ public class TestReplicationScenarios {
   }
 
   private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) {
-    MessageFactory msgFactory = MessageFactory.getInstance();
+    MessageEncoder msgEncoder = null;
+    try {
+      msgEncoder = MessageFactory.getInstance(JSONMessageEncoder.FORMAT);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
     Table t = new Table();
     t.setDbName(dbname);
     t.setTableName(tblname);
     NotificationEvent event = new NotificationEvent(
         evid,
         (int)System.currentTimeMillis(),
-        MessageFactory.CREATE_TABLE_EVENT,
-        msgFactory.buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()).toString()
+        MessageBuilder.CREATE_TABLE_EVENT,
+        MessageBuilder.getInstance().buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator())
+            .toString()
     );
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    event.setMessageFormat(msgFactory.getMessageFormat());
+    event.setMessageFormat(msgEncoder.getMessageFormat());
     return event;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index e043e54..4ceb9fa 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -17,16 +17,16 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
 
 import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,13 +53,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Collections;
-import com.google.common.collect.Lists;
-import org.junit.Ignore;
+import java.util.Map;
 
 import static org.junit.Assert.assertTrue;
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
@@ -72,12 +69,10 @@ public class TestReplicationScenariosAcidTables {
   @Rule
   public final TestName testName = new TestName();
 
-  @Rule
-  public TestRule replV1BackwardCompat;
-
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
-  private static WarehouseInstance primary, replica, replicaNonAcid;
-  private static HiveConf conf;
+  static WarehouseInstance primary;
+  private static WarehouseInstance replica, replicaNonAcid;
+  static HiveConf conf;
   private String primaryDbName, replicatedDbName, primaryDbNameExtra;
   private enum OperationType {
     REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS,
@@ -87,25 +82,38 @@ public class TestReplicationScenariosAcidTables {
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
-    conf = new HiveConf(TestReplicationScenariosAcidTables.class);
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+
+    internalBeforeClassSetup(overrides, TestReplicationScenariosAcidTables.class);
+  }
+
+  static void internalBeforeClassSetup(Map<String, String> overrides,
+      Class clazz) throws Exception {
+
+    conf = new HiveConf(clazz);
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
     MiniDFSCluster miniDFSCluster =
-           new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
-        put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
-        put("hive.support.concurrency", "true");
-        put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-        put("hive.metastore.client.capability.check", "false");
-        put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
-        put("hive.exec.dynamic.partition.mode", "nonstrict");
-        put("hive.strict.checks.bucketing", "false");
-        put("hive.mapred.mode", "nonstrict");
-        put("mapred.input.dir.recursive", "true");
-        put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    HashMap<String, String> acidEnableConf = new HashMap<String, String>() {{
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
     }};
-    primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
-    replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+
+    acidEnableConf.putAll(overrides);
+
+    primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
     HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put("hive.support.concurrency", "false");
@@ -123,7 +131,6 @@ public class TestReplicationScenariosAcidTables {
 
   @Before
   public void setup() throws Throwable {
-    replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
     primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 61473a8..7e8caf0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -19,12 +19,13 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
@@ -43,7 +44,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
@@ -53,7 +53,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -72,33 +71,40 @@ import static org.junit.Assert.assertTrue;
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.junit.Assert;
 
 public class TestReplicationScenariosAcrossInstances {
   @Rule
   public final TestName testName = new TestName();
 
-  @Rule
-  public TestRule replV1BackwardCompat;
-
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
-  private static WarehouseInstance primary, replica;
+  static WarehouseInstance primary;
+  private static WarehouseInstance replica;
   private String primaryDbName, replicatedDbName;
   private static HiveConf conf;
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
-    conf = new HiveConf(TestReplicationScenarios.class);
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+
+    internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
+  }
+
+  static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
+      throws Exception {
+    conf = new HiveConf(clazz);
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
     MiniDFSCluster miniDFSCluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
+    Map<String, String> localOverrides = new HashMap<String, String>() {{
       put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
       put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
     }};
-    primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
-    replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+    localOverrides.putAll(overrides);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
   }
 
   @AfterClass
@@ -109,7 +115,6 @@ public class TestReplicationScenariosAcrossInstances {
 
   @Before
   public void setup() throws Throwable {
-    replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
     primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
@@ -323,7 +328,8 @@ public class TestReplicationScenariosAcrossInstances {
             "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
         .run("create table table1 (i int, j int)")
         .run("insert into table1 values (1,2)")
-        .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+        .dump(primaryDbName, null,
+            Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
     replica.load(replicatedDbName, tuple.dumpLocation)
         .run("use " + replicatedDbName)
@@ -419,7 +425,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("create table table2 (a int, city string) partitioned by (country string)")
             .run("create table table3 (i int, j int)")
             .run("insert into table1 values (1,2)")
-            .dump(dbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+        .dump(dbName, null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
     replica.load(replicatedDbName, tuple.dumpLocation)
             .run("use " + replicatedDbName)
@@ -433,7 +439,8 @@ public class TestReplicationScenariosAcrossInstances {
             .run("alter table table1 rename to renamed_table1")
             .run("insert into table2 partition(country='india') values (1,'mumbai') ")
             .run("create table table4 (i int, j int)")
-            .dump(dbName, tuple.lastReplicationId, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+        .dump(dbName, tuple.lastReplicationId,
+            Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
     replica.load(replicatedDbName, tuple.dumpLocation)
             .run("use " + replicatedDbName)
@@ -467,7 +474,7 @@ public class TestReplicationScenariosAcrossInstances {
                 SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("use " + dbTwo)
         .run("create table t1 (i int, j int)")
-        .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+        .dump("`*`", null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
     /*
       Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
@@ -526,7 +533,7 @@ public class TestReplicationScenariosAcrossInstances {
         .run("use " + dbOne)
         .run("create table t1 (i int, j int) partitioned by (load_date date) "
             + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
-        .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+        .dump("`*`", null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
     String dbTwo = primaryDbName + randomTwo;
     WarehouseInstance.Tuple incrementalTuple = primary
@@ -905,15 +912,20 @@ public class TestReplicationScenariosAcrossInstances {
 
     // Incremental load to non existing db should return database not exist error.
     tuple = primary.dump("someJunkDB", tuple.lastReplicationId);
-    CommandProcessorResponse response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation);
-    response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.metadata.hiveException: " +
-            "database does not exist");
+    CommandProcessorResponse response =
+        replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation + "'");
+    assertTrue(response.getErrorMessage().toLowerCase()
+        .contains("org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: someJunkDB"
+            .toLowerCase()));
 
     // Bootstrap load from an empty dump directory should return empty load directory error.
     tuple = primary.dump("someJunkDB", null);
-    response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation);
-    response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.parse.semanticException:" +
-            " no data to load in path");
+    response = replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation+"'");
+    assertTrue(response.getErrorMessage().toLowerCase()
+        .contains(
+            "semanticException no data to load in path"
+                .toLowerCase())
+    );
 
     primary.run(" drop database if exists " + testDbName + " cascade");
   }
@@ -935,7 +947,8 @@ public class TestReplicationScenariosAcrossInstances {
             .run("insert into table3 partition(country='india') values(3)")
             .dump(primaryDbName, bootstrapTuple.lastReplicationId);
 
-    replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'"))
+    replica.load(replicatedDbName, incremental.dumpLocation,
+        Collections.singletonList("'hive.repl.approx.max.load.tasks'='10'"))
             .status(replicatedDbName)
             .verifyResult(incremental.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -959,7 +972,8 @@ public class TestReplicationScenariosAcrossInstances {
     FileStatus[] fileStatus = fs.listStatus(path);
     int numEvents = fileStatus.length - 1; //one is metadata file
 
-    replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"))
+    replica.load(replicatedDbName, incremental.dumpLocation,
+        Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'"))
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" })
@@ -1112,7 +1126,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("show tables")
             .verifyResults(new String[] { "t1", "t2" })
             .run("select id from t1")
-            .verifyResults(Arrays.asList("10"))
+        .verifyResults(Collections.singletonList("10"))
             .run("select country from t2 order by country")
             .verifyResults(Arrays.asList("india", "uk", "us"));
     verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation);
@@ -1154,9 +1168,8 @@ public class TestReplicationScenariosAcrossInstances {
     // also not loaded.
     BehaviourInjection<CallerArguments, Boolean> callerVerifier
             = new BehaviourInjection<CallerArguments, Boolean>() {
-      @Nullable
       @Override
-      public Boolean apply(@Nullable CallerArguments args) {
+      public Boolean apply(CallerArguments args) {
         injectionPathCalled = true;
         if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
           LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
@@ -1197,9 +1210,8 @@ public class TestReplicationScenariosAcrossInstances {
     // Verify if create table is not called on table t1 but called for t2 and t3.
     // Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails.
     callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
-      @Nullable
       @Override
-      public Boolean apply(@Nullable CallerArguments args) {
+      public Boolean apply(CallerArguments args) {
         injectionPathCalled = true;
         if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.funcName != null)) {
           LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName));
@@ -1235,9 +1247,8 @@ public class TestReplicationScenariosAcrossInstances {
 
     // Verify if no create table/function calls. Only add foreign key constraints on table t2.
     callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
-      @Nullable
       @Override
-      public Boolean apply(@Nullable CallerArguments args) {
+      public Boolean apply(CallerArguments args) {
         injectionPathCalled = true;
         if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null)) {
           LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
@@ -1307,7 +1318,7 @@ public class TestReplicationScenariosAcrossInstances {
     };
     InjectableBehaviourObjectStore.setGetPartitionBehaviour(getPartitionStub);
 
-    List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
+    List<String> withConfigs = Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'");
     replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
     InjectableBehaviourObjectStore.resetGetPartitionBehaviour(); // reset the behaviour
     getPartitionStub.assertInjectionsPerformed(true, false);
@@ -1318,7 +1329,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("show tables")
             .verifyResults(new String[] {"t2" })
             .run("select country from t2 order by country")
-            .verifyResults(Arrays.asList("india"))
+        .verifyResults(Collections.singletonList("india"))
             .run("show functions like '" + replicatedDbName + "*'")
             .verifyResult(replicatedDbName + ".testFunctionOne");
 
@@ -1378,7 +1389,8 @@ public class TestReplicationScenariosAcrossInstances {
 
   @Test
   public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable {
-    List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'");
+    List<String> withConfigs =
+        Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
     String replicatedDbName_CM = replicatedDbName + "_CM";
     WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
             .run("create table t2 (place string) partitioned by (country string)")
@@ -1399,7 +1411,8 @@ public class TestReplicationScenariosAcrossInstances {
 
   @Test
   public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable {
-    List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'");
+    List<String> withConfigs =
+        Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
     String replicatedDbName_CM = replicatedDbName + "_CM";
     WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
             .run("create table t2 (place string) partitioned by (country string)")
@@ -1417,16 +1430,16 @@ public class TestReplicationScenariosAcrossInstances {
     testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple);
   }
 
-  private void testMoveOptimization(String primarydb, String replicadb, String replicatedDbName_CM,
+  private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM,
                                     String tbl,  String eventType, WarehouseInstance.Tuple tuple) throws Throwable {
-    List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'");
+    List<String> withConfigs =
+        Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
 
     // fail add notification for given event type.
     BehaviourInjection<NotificationEvent, Boolean> callerVerifier
             = new BehaviourInjection<NotificationEvent, Boolean>() {
-      @Nullable
       @Override
-      public Boolean apply(@Nullable NotificationEvent entry) {
+      public Boolean apply(NotificationEvent entry) {
         if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) {
           injectionPathCalled = true;
           LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName())
@@ -1440,19 +1453,19 @@ public class TestReplicationScenariosAcrossInstances {
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
     try {
-      replica.loadFailure(replicadb, tuple.dumpLocation, withConfigs);
+      replica.loadFailure(replicaDb, tuple.dumpLocation, withConfigs);
     } finally {
       InjectableBehaviourObjectStore.resetAddNotificationModifier();
     }
 
     callerVerifier.assertInjectionsPerformed(true, false);
-    replica.load(replicadb, tuple.dumpLocation, withConfigs);
+    replica.load(replicaDb, tuple.dumpLocation, withConfigs);
 
-    replica.run("use " + replicadb)
+    replica.run("use " + replicaDb)
             .run("select country from " + tbl + " where country == 'india'")
             .verifyResults(Arrays.asList("india"));
 
-    primary.run("use " + primarydb)
+    primary.run("use " + primaryDb)
             .run("drop table " + tbl);
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);