You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:04 UTC

[54/75] [abbrv] hive git commit: HIVE-20679: DDL operations on hive might create large messages for DBNotification (Anishek Agarwal, reviewed by Sankar Hariappan)

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 3fe8b58..314ca48 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -17,32 +17,19 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+
 import org.junit.rules.TestName;
 
-import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -50,13 +37,11 @@ import org.junit.BeforeClass;
 import org.junit.AfterClass;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import javax.annotation.Nullable;
-import java.util.Collections;
+import java.util.Map;
+
 import com.google.common.collect.Lists;
-import org.junit.Ignore;
 
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables
@@ -65,11 +50,9 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
   @Rule
   public final TestName testName = new TestName();
 
-  @Rule
-  public TestRule replV1BackwardCompat;
-
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosIncrementalLoadAcidTables.class);
-  private static WarehouseInstance primary, replica, replicaNonAcid;
+  static WarehouseInstance primary;
+  private static WarehouseInstance replica, replicaNonAcid;
   private static HiveConf conf;
   private String primaryDbName, replicatedDbName, primaryDbNameExtra;
   private enum OperationType {
@@ -80,12 +63,21 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
-    conf = new HiveConf(TestReplicationScenariosAcidTables.class);
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+
+    internalBeforeClassSetup(overrides, TestReplicationScenariosAcidTables.class);
+  }
+
+  static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
+      throws Exception {
+    conf = new HiveConf(clazz);
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
     MiniDFSCluster miniDFSCluster =
            new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
+    HashMap<String, String> acidConfs = new HashMap<String, String>() {{
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put("hive.support.concurrency", "true");
         put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
@@ -97,9 +89,11 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
         put("mapred.input.dir.recursive", "true");
         put("hive.metastore.disallow.incompatible.col.type.changes", "false");
     }};
-    primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
-    replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
-    HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
+
+    acidConfs.putAll(overrides);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
+    Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put("hive.support.concurrency", "false");
         put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
@@ -116,7 +110,6 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
 
   @Before
   public void setup() throws Throwable {
-    replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
     primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 1e3478d..aae7bd7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -100,7 +100,7 @@ public class WarehouseInstance implements Closeable {
     initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf);
   }
 
-  public WarehouseInstance(Logger logger, MiniDFSCluster cluster,
+  WarehouseInstance(Logger logger, MiniDFSCluster cluster,
       Map<String, String> overridesForHiveConf) throws Exception {
     this(logger, cluster, overridesForHiveConf, null);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 66f3b78..a51b7e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.Entity.Type;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -997,12 +997,12 @@ public final class QueryResultsCache {
       String tableName;
 
       switch (event.getEventType()) {
-      case MessageFactory.ADD_PARTITION_EVENT:
-      case MessageFactory.ALTER_PARTITION_EVENT:
-      case MessageFactory.DROP_PARTITION_EVENT:
-      case MessageFactory.ALTER_TABLE_EVENT:
-      case MessageFactory.DROP_TABLE_EVENT:
-      case MessageFactory.INSERT_EVENT:
+      case MessageBuilder.ADD_PARTITION_EVENT:
+      case MessageBuilder.ALTER_PARTITION_EVENT:
+      case MessageBuilder.DROP_PARTITION_EVENT:
+      case MessageBuilder.ALTER_TABLE_EVENT:
+      case MessageBuilder.DROP_TABLE_EVENT:
+      case MessageBuilder.INSERT_EVENT:
         dbName = event.getDbName();
         tableName = event.getTableName();
         break;

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 4872080..c75bde5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -31,11 +31,9 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -156,8 +154,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
     IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
         new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern),
-        new EventBoundaryFilter(work.eventFrom, work.eventTo),
-        new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
+        new EventBoundaryFilter(work.eventFrom, work.eventTo));
 
     EventUtils.MSClientNotificationFetcher evFetcher
         = new EventUtils.MSClientNotificationFetcher(hiveDb);

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
index d09b98c..8747727 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
 import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
 import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
@@ -63,7 +63,7 @@ public class LoadConstraint {
   private final ConstraintEvent event;
   private final String dbNameToLoadIn;
   private final TaskTracker tracker;
-  private final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
+  private final MessageDeserializer deserializer = JSONMessageEncoder.getInstance().getDeserializer();
 
   public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn,
       TaskTracker existingTracker) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
index b9a5d21..5db3f26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
@@ -18,20 +18,26 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class AbortTxnHandler extends AbstractEventHandler {
+class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> {
 
   AbortTxnHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  AbortTxnMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAbortTxnMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
index 3ed005c..672f402 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
@@ -18,9 +18,10 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 
-abstract class AbstractConstraintEventHandler extends AbstractEventHandler {
+abstract class AbstractConstraintEventHandler<T extends EventMessage> extends AbstractEventHandler<T> {
   AbstractConstraintEventHandler(NotificationEvent event) {
     super(event);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index a70c673..b996703 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -18,20 +18,48 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract class AbstractEventHandler implements EventHandler {
+abstract class AbstractEventHandler<T extends EventMessage> implements EventHandler {
   static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class);
+  static final MessageEncoder jsonMessageEncoder = JSONMessageEncoder.getInstance();
 
   final NotificationEvent event;
   final MessageDeserializer deserializer;
+  final String eventMessageAsJSON;
+  final T eventMessage;
 
   AbstractEventHandler(NotificationEvent event) {
     this.event = event;
-    deserializer = MessageFactory.getInstance().getDeserializer();
+    try {
+      deserializer = MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
+    } catch (Exception e) {
+      String message =
+          "could not create appropriate messageFactory for format " + event.getMessageFormat();
+      LOG.error(message, e);
+      throw new IllegalStateException(message, e);
+    }
+    eventMessage = eventMessage(event.getMessage());
+    eventMessageAsJSON = eventMessageAsJSON(eventMessage);
+  }
+
+  /**
+   * This takes in the string representation of the message in the format as specified in rdbms backing metastore.
+   */
+  abstract T eventMessage(String stringRepresentation);
+
+  private String eventMessageAsJSON(T eventMessage) {
+    if (eventMessage == null) {
+      // this will only happen in case DefaultHandler is invoked
+      return null;
+    }
+    return jsonMessageEncoder.getSerializer().serialize(eventMessage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
index 8fdf2f1..736a162 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
@@ -18,21 +18,27 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-public class AddForeignKeyHandler extends AbstractConstraintEventHandler {
+public class AddForeignKeyHandler extends AbstractConstraintEventHandler<AddForeignKeyMessage> {
   AddForeignKeyHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  AddForeignKeyMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAddForeignKeyMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
     LOG.debug("Processing#{} ADD_FOREIGNKEY_MESSAGE message : {}", fromEventId(),
-        event.getMessage());
+        eventMessageAsJSON);
     if (shouldReplicate(withinContext)) {
       DumpMetaData dmd = withinContext.createDmd(this);
-      dmd.setPayload(event.getMessage());
+      dmd.setPayload(eventMessageAsJSON);
       dmd.write();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
index 335d4e6..c778198 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
@@ -18,22 +18,28 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-public class AddNotNullConstraintHandler extends AbstractConstraintEventHandler {
+public class AddNotNullConstraintHandler extends AbstractConstraintEventHandler<AddNotNullConstraintMessage> {
   AddNotNullConstraintHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  AddNotNullConstraintMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAddNotNullConstraintMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
     LOG.debug("Processing#{} ADD_NOTNULLCONSTRAINT_MESSAGE message : {}", fromEventId(),
-        event.getMessage());
+        eventMessageAsJSON);
 
     if (shouldReplicate(withinContext)) {
       DumpMetaData dmd = withinContext.createDmd(this);
-      dmd.setPayload(event.getMessage());
+      dmd.setPayload(eventMessageAsJSON);
       dmd.write();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index 973a65b..5c16887 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -42,10 +43,15 @@ class AddPartitionHandler extends AbstractEventHandler {
   }
 
   @Override
+  EventMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAddPartitionMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), eventMessageAsJSON);
 
-    AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage());
+    AddPartitionMessage apm = (AddPartitionMessage) eventMessage;
     org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
     if (tobj == null) {
       LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
index cf45c68..f9c08c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
@@ -18,22 +18,28 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-public class AddPrimaryKeyHandler extends AbstractConstraintEventHandler {
+public class AddPrimaryKeyHandler extends AbstractConstraintEventHandler<AddPrimaryKeyMessage> {
   AddPrimaryKeyHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  AddPrimaryKeyMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAddPrimaryKeyMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
     LOG.debug("Processing#{} ADD_PRIMARYKEY_MESSAGE message : {}", fromEventId(),
-        event.getMessage());
+        eventMessageAsJSON);
 
     if (shouldReplicate(withinContext)) {
       DumpMetaData dmd = withinContext.createDmd(this);
-      dmd.setPayload(event.getMessage());
+      dmd.setPayload(eventMessageAsJSON);
       dmd.write();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
index 58835a0..69caf08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
@@ -18,22 +18,29 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-public class AddUniqueConstraintHandler extends AbstractConstraintEventHandler {
+public class AddUniqueConstraintHandler
+    extends AbstractConstraintEventHandler<AddUniqueConstraintMessage> {
   AddUniqueConstraintHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  AddUniqueConstraintMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAddUniqueConstraintMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
     LOG.debug("Processing#{} ADD_UNIQUECONSTRAINT_MESSAGE message : {}", fromEventId(),
-        event.getMessage());
+        eventMessageAsJSON);
 
     if (shouldReplicate(withinContext)) {
       DumpMetaData dmd = withinContext.createDmd(this);
-      dmd.setPayload(event.getMessage());
+      dmd.setPayload(eventMessageAsJSON);
       dmd.write();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
index 38efbd7..7602d1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -18,19 +18,25 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class AllocWriteIdHandler extends AbstractEventHandler {
+class AllocWriteIdHandler extends AbstractEventHandler<AllocWriteIdMessage> {
   AllocWriteIdHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  AllocWriteIdMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAllocWriteIdMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
index 3863c59..a31d1b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
@@ -18,20 +18,26 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class AlterDatabaseHandler extends AbstractEventHandler {
+class AlterDatabaseHandler extends AbstractEventHandler<AlterDatabaseMessage> {
 
   AlterDatabaseHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  AlterDatabaseMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAlterDatabaseMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} ALTER_DATABASE message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} ALTER_DATABASE message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
index cde4eed..d81408e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
@@ -23,17 +23,15 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-class AlterPartitionHandler extends AbstractEventHandler {
+class AlterPartitionHandler extends AbstractEventHandler<AlterPartitionMessage> {
   private final org.apache.hadoop.hive.metastore.api.Partition after;
   private final org.apache.hadoop.hive.metastore.api.Table tableObject;
   private final boolean isTruncateOp;
@@ -41,7 +39,7 @@ class AlterPartitionHandler extends AbstractEventHandler {
 
   AlterPartitionHandler(NotificationEvent event) throws Exception {
     super(event);
-    AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage());
+    AlterPartitionMessage apm = eventMessage;
     tableObject = apm.getTableObj();
     org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore();
     after = apm.getPtnObjAfter();
@@ -49,6 +47,11 @@ class AlterPartitionHandler extends AbstractEventHandler {
     scenario = scenarioType(before, after);
   }
 
+  @Override
+  AlterPartitionMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAlterPartitionMessage(stringRepresentation);
+  }
+
   private enum Scenario {
     ALTER {
       @Override
@@ -86,7 +89,7 @@ class AlterPartitionHandler extends AbstractEventHandler {
 
   @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), eventMessageAsJSON);
 
     Table qlMdTable = new Table(tableObject);
     if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
@@ -107,7 +110,7 @@ class AlterPartitionHandler extends AbstractEventHandler {
           withinContext.hiveConf);
     }
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
index 5f582b3..00fa370 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class AlterTableHandler extends AbstractEventHandler {
+class AlterTableHandler extends AbstractEventHandler<AlterTableMessage> {
   private final org.apache.hadoop.hive.metastore.api.Table before;
   private final org.apache.hadoop.hive.metastore.api.Table after;
   private final boolean isTruncateOp;
@@ -59,13 +59,17 @@ class AlterTableHandler extends AbstractEventHandler {
 
   AlterTableHandler(NotificationEvent event) throws Exception {
     super(event);
-    AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
-    before = atm.getTableObjBefore();
-    after = atm.getTableObjAfter();
-    isTruncateOp = atm.getIsTruncateOp();
+    before = eventMessage.getTableObjBefore();
+    after = eventMessage.getTableObjAfter();
+    isTruncateOp = eventMessage.getIsTruncateOp();
     scenario = scenarioType(before, after);
   }
 
+  @Override
+  AlterTableMessage eventMessage(String stringRepresentation) {
+    return deserializer.getAlterTableMessage(stringRepresentation);
+  }
+
   private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before,
       org.apache.hadoop.hive.metastore.api.Table after) {
     if (before.getDbName().equals(after.getDbName())
@@ -78,7 +82,7 @@ class AlterTableHandler extends AbstractEventHandler {
 
   @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), eventMessageAsJSON);
 
     Table qlMdTableBefore = new Table(before);
     if (!Utils
@@ -100,7 +104,7 @@ class AlterTableHandler extends AbstractEventHandler {
     }
  
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 82a722f..620263f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -40,12 +40,17 @@ import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.List;
 
-class CommitTxnHandler extends AbstractEventHandler {
+class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
 
   CommitTxnHandler(NotificationEvent event) {
     super(event);
   }
 
+  @Override
+  CommitTxnMessage eventMessage(String stringRepresentation) {
+    return deserializer.getCommitTxnMessage(stringRepresentation);
+  }
+
   private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
     Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
     FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
@@ -97,23 +102,22 @@ class CommitTxnHandler extends AbstractEventHandler {
 
   @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), event.getMessage());
-    String payload = event.getMessage();
+    LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON);
+    String payload = eventMessageAsJSON;
 
     if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
-      CommitTxnMessage commitTxnMessage = deserializer.getCommitTxnMessage(event.getMessage());
 
       String contextDbName =  withinContext.dbName == null ? null :
               StringUtils.normalizeIdentifier(withinContext.dbName);
       String contextTableName =  withinContext.tableName == null ? null :
               StringUtils.normalizeIdentifier(withinContext.tableName);
       List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
-              getAllWriteEventInfo(commitTxnMessage.getTxnId(), contextDbName, contextTableName);
+          getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName);
       int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
       if (numEntry != 0) {
-        commitTxnMessage.addWriteEventInfo(writeEventInfoList);
-        payload = commitTxnMessage.toString();
-        LOG.debug("payload for commit txn event : " + payload);
+        eventMessage.addWriteEventInfo(writeEventInfoList);
+        payload = jsonMessageEncoder.getSerializer().serialize(eventMessage);
+        LOG.debug("payload for commit txn event : " + eventMessageAsJSON);
       }
 
       org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null;
@@ -128,7 +132,7 @@ class CommitTxnHandler extends AbstractEventHandler {
       // combination as primary key, so the entries with same table will come together. Only basic table metadata is
       // used during import, so we need not dump the latest table metadata.
       for (int idx = 0; idx < numEntry; idx++) {
-        qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(commitTxnMessage.getTableObj(idx));
+        qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(eventMessage.getTableObj(idx));
         if (qlMdTablePrev == null) {
           qlMdTablePrev = qlMdTable;
         }
@@ -141,13 +145,13 @@ class CommitTxnHandler extends AbstractEventHandler {
           qlMdTablePrev = qlMdTable;
         }
 
-        if (qlMdTable.isPartitioned() && (null != commitTxnMessage.getPartitionObj(idx))) {
+        if (qlMdTable.isPartitioned() && (null != eventMessage.getPartitionObj(idx))) {
           qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable,
-                  commitTxnMessage.getPartitionObj(idx)));
+              eventMessage.getPartitionObj(idx)));
         }
 
         filesTobeAdded.add(Lists.newArrayList(
-                ReplChangeManager.getListFromSeparatedString(commitTxnMessage.getFiles(idx))));
+            ReplChangeManager.getListFromSeparatedString(eventMessage.getFiles(idx))));
       }
 
       //Dump last table in the list

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
index 21eb74b..7d64e49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
@@ -24,19 +24,22 @@ import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 
-class CreateDatabaseHandler extends AbstractEventHandler {
+class CreateDatabaseHandler extends AbstractEventHandler<CreateDatabaseMessage> {
   CreateDatabaseHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  CreateDatabaseMessage eventMessage(String stringRepresentation) {
+    return deserializer.getCreateDatabaseMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), event.getMessage());
-    CreateDatabaseMessage createDatabaseMsg =
-        deserializer.getCreateDatabaseMessage(event.getMessage());
+    LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), eventMessageAsJSON);
     Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
     FileSystem fileSystem = metaDataPath.getFileSystem(withinContext.hiveConf);
-    EximUtil.createDbExportDump(fileSystem, metaDataPath, createDatabaseMsg.getDatabaseObject(),
+    EximUtil.createDbExportDump(fileSystem, metaDataPath, eventMessage.getDatabaseObject(),
         withinContext.replicationSpec);
     withinContext.createDmd(this).write();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
index 5f0338e..5954e15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -27,21 +27,24 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 
-class CreateFunctionHandler extends AbstractEventHandler {
+class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage> {
   CreateFunctionHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  CreateFunctionMessage eventMessage(String stringRepresentation) {
+    return deserializer.getCreateFunctionMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    CreateFunctionMessage createFunctionMessage =
-        deserializer.getCreateFunctionMessage(event.getMessage());
-    LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), eventMessageAsJSON);
     Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
     FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf);
 
     try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) {
-      new FunctionSerializer(createFunctionMessage.getFunctionObj(), withinContext.hiveConf)
+      new FunctionSerializer(eventMessage.getFunctionObj(), withinContext.hiveConf)
           .writeTo(jsonWriter, withinContext.replicationSpec);
     }
     withinContext.createDmd(this).write();

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 897ea7f..550a82d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -25,23 +25,26 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 
-class CreateTableHandler extends AbstractEventHandler {
+class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
 
   CreateTableHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  CreateTableMessage eventMessage(String stringRepresentation) {
+    return deserializer.getCreateTableMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage());
-    LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage());
-    org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
+    LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), eventMessageAsJSON);
+    org.apache.hadoop.hive.metastore.api.Table tobj = eventMessage.getTableObj();
 
     if (tobj == null) {
       LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
@@ -68,7 +71,7 @@ class CreateTableHandler extends AbstractEventHandler {
         withinContext.hiveConf);
 
     Path dataPath = new Path(withinContext.eventRoot, "data");
-    Iterable<String> files = ctm.getFiles();
+    Iterable<String> files = eventMessage.getFiles();
     if (files != null) {
       // encoded filename/checksum of files, write into _files
       try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
index 8977f62..864cb98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
@@ -30,7 +31,15 @@ class DefaultHandler extends AbstractEventHandler {
   }
 
   @Override
+  EventMessage eventMessage(String stringRepresentation) {
+    return null;
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
+    // we specifically use the the message string from the original event since we dont know what type of message
+    // to convert this message to, this handler should not be called since with different message formats we need
+    // the ability to convert messages to a given message type.
     LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage());
     DumpMetaData dmd = withinContext.createDmd(this);
     dmd.setPayload(event.getMessage());

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
index 979e9a1..4c239e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
@@ -18,19 +18,26 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class DropConstraintHandler extends AbstractEventHandler {
+class DropConstraintHandler extends AbstractEventHandler<DropConstraintMessage> {
   DropConstraintHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  DropConstraintMessage eventMessage(String stringRepresentation) {
+    return deserializer.getDropConstraintMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} DROP_CONSTRAINT_MESSAGE message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} DROP_CONSTRAINT_MESSAGE message : {}", fromEventId(),
+        eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
index 4eae778..f09f77d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
@@ -18,19 +18,25 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class DropDatabaseHandler extends AbstractEventHandler {
+class DropDatabaseHandler extends AbstractEventHandler<DropDatabaseMessage> {
   DropDatabaseHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  DropDatabaseMessage eventMessage(String stringRepresentation) {
+    return deserializer.getDropDatabaseMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
index 352b0cc..6140c0c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
@@ -18,20 +18,26 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class DropFunctionHandler extends AbstractEventHandler {
+class DropFunctionHandler extends AbstractEventHandler<DropFunctionMessage> {
 
   DropFunctionHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  DropFunctionMessage eventMessage(String stringRepresentation) {
+    return deserializer.getDropFunctionMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
index 19b7044..e2a40d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
@@ -19,21 +19,27 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class DropPartitionHandler extends AbstractEventHandler {
+class DropPartitionHandler extends AbstractEventHandler<DropPartitionMessage> {
 
   DropPartitionHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  DropPartitionMessage eventMessage(String stringRepresentation) {
+    return deserializer.getDropPartitionMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
index cce0192..7d17de2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
@@ -19,21 +19,27 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class DropTableHandler extends AbstractEventHandler {
+class DropTableHandler extends AbstractEventHandler<DropTableMessage> {
 
   DropTableHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  DropTableMessage eventMessage(String stringRepresentation) {
+    return deserializer.getDropTableMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
index a1d61f9..2a0379e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -33,27 +33,27 @@ public class EventHandlerFactory {
   private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>();
 
   static {
-    register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class);
-    register(MessageFactory.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class);
-    register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
-    register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class);
-    register(MessageFactory.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class);
-    register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class);
-    register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
-    register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
-    register(MessageFactory.INSERT_EVENT, InsertHandler.class);
-    register(MessageFactory.DROP_FUNCTION_EVENT, DropFunctionHandler.class);
-    register(MessageFactory.ADD_PRIMARYKEY_EVENT, AddPrimaryKeyHandler.class);
-    register(MessageFactory.ADD_FOREIGNKEY_EVENT, AddForeignKeyHandler.class);
-    register(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class);
-    register(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class);
-    register(MessageFactory.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class);
-    register(MessageFactory.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class);
-    register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class);
-    register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class);
-    register(MessageFactory.COMMIT_TXN_EVENT, CommitTxnHandler.class);
-    register(MessageFactory.ABORT_TXN_EVENT, AbortTxnHandler.class);
-    register(MessageFactory.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class);
+    register(MessageBuilder.ADD_PARTITION_EVENT, AddPartitionHandler.class);
+    register(MessageBuilder.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class);
+    register(MessageBuilder.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
+    register(MessageBuilder.ALTER_TABLE_EVENT, AlterTableHandler.class);
+    register(MessageBuilder.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class);
+    register(MessageBuilder.CREATE_TABLE_EVENT, CreateTableHandler.class);
+    register(MessageBuilder.DROP_PARTITION_EVENT, DropPartitionHandler.class);
+    register(MessageBuilder.DROP_TABLE_EVENT, DropTableHandler.class);
+    register(MessageBuilder.INSERT_EVENT, InsertHandler.class);
+    register(MessageBuilder.DROP_FUNCTION_EVENT, DropFunctionHandler.class);
+    register(MessageBuilder.ADD_PRIMARYKEY_EVENT, AddPrimaryKeyHandler.class);
+    register(MessageBuilder.ADD_FOREIGNKEY_EVENT, AddForeignKeyHandler.class);
+    register(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class);
+    register(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class);
+    register(MessageBuilder.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class);
+    register(MessageBuilder.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class);
+    register(MessageBuilder.DROP_DATABASE_EVENT, DropDatabaseHandler.class);
+    register(MessageBuilder.OPEN_TXN_EVENT, OpenTxnHandler.class);
+    register(MessageBuilder.COMMIT_TXN_EVENT, CommitTxnHandler.class);
+    register(MessageBuilder.ABORT_TXN_EVENT, AbortTxnHandler.class);
+    register(MessageBuilder.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class);
   }
 
   static void register(String event, Class<? extends EventHandler> handlerClazz) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index cf3822a..842e20a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -36,19 +36,23 @@ import java.util.Collections;
 import java.util.List;
 
 
-class InsertHandler extends AbstractEventHandler {
+class InsertHandler extends AbstractEventHandler<InsertMessage> {
 
   InsertHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  InsertMessage eventMessage(String stringRepresentation) {
+    return deserializer.getInsertMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
     if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
       return;
     }
-    InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
-    org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg);
+    org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(eventMessage);
 
     if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
       return;
@@ -58,18 +62,18 @@ class InsertHandler extends AbstractEventHandler {
     assert(!AcidUtils.isTransactionalTable(qlMdTable));
 
     List<Partition> qlPtns = null;
-    if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) {
-      qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg));
+    if (qlMdTable.isPartitioned() && (null != eventMessage.getPtnObj())) {
+      qlPtns = Collections.singletonList(partitionObject(qlMdTable, eventMessage));
     }
     Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
 
     // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
-    withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
+    withinContext.replicationSpec.setIsReplace(eventMessage.isReplace());
     EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
         qlMdTable, qlPtns,
         withinContext.replicationSpec,
         withinContext.hiveConf);
-    Iterable<String> files = insertMsg.getFiles();
+    Iterable<String> files = eventMessage.getFiles();
 
     if (files != null) {
       Path dataPath;
@@ -93,9 +97,9 @@ class InsertHandler extends AbstractEventHandler {
       }
     }
 
-    LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} INSERT message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
index fe81fe1..215e726 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
@@ -18,20 +18,26 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-class OpenTxnHandler extends AbstractEventHandler {
+class OpenTxnHandler extends AbstractEventHandler<OpenTxnMessage> {
 
   OpenTxnHandler(NotificationEvent event) {
     super(event);
   }
 
   @Override
+  OpenTxnMessage eventMessage(String stringRepresentation) {
+    return deserializer.getOpenTxnMessage(stringRepresentation);
+  }
+
+  @Override
   public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), event.getMessage());
+    LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(eventMessageAsJSON);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
index 2848212..ae3db9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 
@@ -52,16 +52,16 @@ public class ConstraintsSerializer implements JsonWriter.Serializer {
     String pksString, fksString, uksString, nnsString;
     pksString = fksString = uksString = nnsString = "";
     if (pks != null) {
-      pksString = MessageFactory.getInstance().buildAddPrimaryKeyMessage(pks).toString();
+      pksString = MessageBuilder.getInstance().buildAddPrimaryKeyMessage(pks).toString();
     }
     if (fks != null) {
-      fksString = MessageFactory.getInstance().buildAddForeignKeyMessage(fks).toString();
+      fksString = MessageBuilder.getInstance().buildAddForeignKeyMessage(fks).toString();
     }
     if (uks != null) {
-      uksString = MessageFactory.getInstance().buildAddUniqueConstraintMessage(uks).toString();
+      uksString = MessageBuilder.getInstance().buildAddUniqueConstraintMessage(uks).toString();
     }
     if (nns != null) {
-      nnsString = MessageFactory.getInstance().buildAddNotNullConstraintMessage(nns).toString();
+      nnsString = MessageBuilder.getInstance().buildAddNotNullConstraintMessage(nns).toString();
     }
     writer.jsonGenerator.writeStringField("pks", pksString);
     writer.jsonGenerator.writeStringField("uks", uksString);

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
index 5b26681..32ac6ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
@@ -30,7 +30,7 @@ abstract class AbstractMessageHandler implements MessageHandler {
   final HashSet<ReadEntity> readEntitySet = new HashSet<>();
   final HashSet<WriteEntity> writeEntitySet = new HashSet<>();
   final UpdatedMetaDataTracker updatedMetadata = new UpdatedMetaDataTracker();
-  final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
+  final MessageDeserializer deserializer = JSONMessageEncoder.getInstance().getDeserializer();
 
   @Override
   public Set<ReadEntity> readEntities() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
index 7057890..6a3c563 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.junit.Test;
 
@@ -53,9 +54,11 @@ public class TestEventHandlerFactory {
 
   @Test
   public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() {
+    NotificationEvent event = new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
+        "shouldGiveDefaultHandler", "s");
+    event.setMessageFormat(JSONMessageEncoder.FORMAT);
     EventHandler eventHandler =
-        EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
-            "shouldGiveDefaultHandler", "s"));
+        EventHandlerFactory.handlerFor(event);
     assertTrue(eventHandler instanceof DefaultHandler);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 1795ef7..1d64cce 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -514,7 +514,7 @@ public class MetastoreConf {
             "Alternatively, configure hive.metastore.transactional.event.listeners to ensure both are invoked in same JDO transaction."),
     EVENT_MESSAGE_FACTORY("metastore.event.message.factory",
         "hive.metastore.event.message.factory",
-        "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory",
+        "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder",
         "Factory class for making encoding and decoding messages in the events generated."),
     EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS("metastore.notification.parameters.exclude.patterns",
         "hive.metastore.notification.parameters.exclude.patterns", "",

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
index f24b419..1262c12 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
@@ -30,38 +30,38 @@ public abstract class EventMessage {
    */
   public enum EventType {
 
-    CREATE_DATABASE(MessageFactory.CREATE_DATABASE_EVENT),
-    DROP_DATABASE(MessageFactory.DROP_DATABASE_EVENT),
-    CREATE_TABLE(MessageFactory.CREATE_TABLE_EVENT),
-    DROP_TABLE(MessageFactory.DROP_TABLE_EVENT),
-    ADD_PARTITION(MessageFactory.ADD_PARTITION_EVENT),
-    DROP_PARTITION(MessageFactory.DROP_PARTITION_EVENT),
-    ALTER_DATABASE(MessageFactory.ALTER_DATABASE_EVENT),
-    ALTER_TABLE(MessageFactory.ALTER_TABLE_EVENT),
-    ALTER_PARTITION(MessageFactory.ALTER_PARTITION_EVENT),
-    INSERT(MessageFactory.INSERT_EVENT),
-    CREATE_FUNCTION(MessageFactory.CREATE_FUNCTION_EVENT),
-    DROP_FUNCTION(MessageFactory.DROP_FUNCTION_EVENT),
-
-    ADD_PRIMARYKEY(MessageFactory.ADD_PRIMARYKEY_EVENT),
-    ADD_FOREIGNKEY(MessageFactory.ADD_FOREIGNKEY_EVENT),
-    ADD_UNIQUECONSTRAINT(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT),
-    ADD_NOTNULLCONSTRAINT(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT),
-    DROP_CONSTRAINT(MessageFactory.DROP_CONSTRAINT_EVENT),
-    CREATE_ISCHEMA(MessageFactory.CREATE_ISCHEMA_EVENT),
-    ALTER_ISCHEMA(MessageFactory.ALTER_ISCHEMA_EVENT),
-    DROP_ISCHEMA(MessageFactory.DROP_ISCHEMA_EVENT),
-    ADD_SCHEMA_VERSION(MessageFactory.ADD_SCHEMA_VERSION_EVENT),
-    ALTER_SCHEMA_VERSION(MessageFactory.ALTER_SCHEMA_VERSION_EVENT),
-    DROP_SCHEMA_VERSION(MessageFactory.DROP_SCHEMA_VERSION_EVENT),
-    CREATE_CATALOG(MessageFactory.CREATE_CATALOG_EVENT),
-    DROP_CATALOG(MessageFactory.DROP_CATALOG_EVENT),
-    OPEN_TXN(MessageFactory.OPEN_TXN_EVENT),
-    COMMIT_TXN(MessageFactory.COMMIT_TXN_EVENT),
-    ABORT_TXN(MessageFactory.ABORT_TXN_EVENT),
-    ALLOC_WRITE_ID(MessageFactory.ALLOC_WRITE_ID_EVENT),
-    ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT),
-    ACID_WRITE(MessageFactory.ACID_WRITE_EVENT);
+    CREATE_DATABASE(MessageBuilder.CREATE_DATABASE_EVENT),
+    DROP_DATABASE(MessageBuilder.DROP_DATABASE_EVENT),
+    CREATE_TABLE(MessageBuilder.CREATE_TABLE_EVENT),
+    DROP_TABLE(MessageBuilder.DROP_TABLE_EVENT),
+    ADD_PARTITION(MessageBuilder.ADD_PARTITION_EVENT),
+    DROP_PARTITION(MessageBuilder.DROP_PARTITION_EVENT),
+    ALTER_DATABASE(MessageBuilder.ALTER_DATABASE_EVENT),
+    ALTER_TABLE(MessageBuilder.ALTER_TABLE_EVENT),
+    ALTER_PARTITION(MessageBuilder.ALTER_PARTITION_EVENT),
+    INSERT(MessageBuilder.INSERT_EVENT),
+    CREATE_FUNCTION(MessageBuilder.CREATE_FUNCTION_EVENT),
+    DROP_FUNCTION(MessageBuilder.DROP_FUNCTION_EVENT),
+
+    ADD_PRIMARYKEY(MessageBuilder.ADD_PRIMARYKEY_EVENT),
+    ADD_FOREIGNKEY(MessageBuilder.ADD_FOREIGNKEY_EVENT),
+    ADD_UNIQUECONSTRAINT(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT),
+    ADD_NOTNULLCONSTRAINT(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT),
+    DROP_CONSTRAINT(MessageBuilder.DROP_CONSTRAINT_EVENT),
+    CREATE_ISCHEMA(MessageBuilder.CREATE_ISCHEMA_EVENT),
+    ALTER_ISCHEMA(MessageBuilder.ALTER_ISCHEMA_EVENT),
+    DROP_ISCHEMA(MessageBuilder.DROP_ISCHEMA_EVENT),
+    ADD_SCHEMA_VERSION(MessageBuilder.ADD_SCHEMA_VERSION_EVENT),
+    ALTER_SCHEMA_VERSION(MessageBuilder.ALTER_SCHEMA_VERSION_EVENT),
+    DROP_SCHEMA_VERSION(MessageBuilder.DROP_SCHEMA_VERSION_EVENT),
+    CREATE_CATALOG(MessageBuilder.CREATE_CATALOG_EVENT),
+    DROP_CATALOG(MessageBuilder.DROP_CATALOG_EVENT),
+    OPEN_TXN(MessageBuilder.OPEN_TXN_EVENT),
+    COMMIT_TXN(MessageBuilder.COMMIT_TXN_EVENT),
+    ABORT_TXN(MessageBuilder.ABORT_TXN_EVENT),
+    ALLOC_WRITE_ID(MessageBuilder.ALLOC_WRITE_ID_EVENT),
+    ALTER_CATALOG(MessageBuilder.ALTER_CATALOG_EVENT),
+    ACID_WRITE(MessageBuilder.ACID_WRITE_EVENT);
 
     private String typeString;