You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/10/22 08:22:00 UTC

[2/4] hive git commit: HIVE-20679: DDL operations on hive might create large messages for DBNotification (Anishek Agarwal, reviewed by Sankar Hariappan)

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
new file mode 100644
index 0000000..787b9b2
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.regex.PatternSyntaxException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAbortTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONInsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONOpenTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.filterMapkeys;
+
+public class MessageBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(MessageBuilder.class);
+
+  public static final String ADD_PARTITION_EVENT = "ADD_PARTITION";
+  public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION";
+  public static final String DROP_PARTITION_EVENT = "DROP_PARTITION";
+  public static final String CREATE_TABLE_EVENT = "CREATE_TABLE";
+  public static final String ALTER_TABLE_EVENT = "ALTER_TABLE";
+  public static final String DROP_TABLE_EVENT = "DROP_TABLE";
+  public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE";
+  public static final String ALTER_DATABASE_EVENT = "ALTER_DATABASE";
+  public static final String DROP_DATABASE_EVENT = "DROP_DATABASE";
+  public static final String INSERT_EVENT = "INSERT";
+  public static final String CREATE_FUNCTION_EVENT = "CREATE_FUNCTION";
+  public static final String DROP_FUNCTION_EVENT = "DROP_FUNCTION";
+  public static final String ADD_PRIMARYKEY_EVENT = "ADD_PRIMARYKEY";
+  public static final String ADD_FOREIGNKEY_EVENT = "ADD_FOREIGNKEY";
+  public static final String ADD_UNIQUECONSTRAINT_EVENT = "ADD_UNIQUECONSTRAINT";
+  public static final String ADD_NOTNULLCONSTRAINT_EVENT = "ADD_NOTNULLCONSTRAINT";
+  public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT";
+  public static final String CREATE_ISCHEMA_EVENT = "CREATE_ISCHEMA";
+  public static final String ALTER_ISCHEMA_EVENT = "ALTER_ISCHEMA";
+  public static final String DROP_ISCHEMA_EVENT = "DROP_ISCHEMA";
+  public static final String ADD_SCHEMA_VERSION_EVENT = "ADD_SCHEMA_VERSION";
+  public static final String ALTER_SCHEMA_VERSION_EVENT = "ALTER_SCHEMA_VERSION";
+  public static final String DROP_SCHEMA_VERSION_EVENT = "DROP_SCHEMA_VERSION";
+  public static final String CREATE_CATALOG_EVENT = "CREATE_CATALOG";
+  public static final String DROP_CATALOG_EVENT = "DROP_CATALOG";
+  public static final String OPEN_TXN_EVENT = "OPEN_TXN";
+  public static final String COMMIT_TXN_EVENT = "COMMIT_TXN";
+  public static final String ABORT_TXN_EVENT = "ABORT_TXN";
+  public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT";
+  public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG";
+  public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT";
+
+  protected static final Configuration conf = MetastoreConf.newMetastoreConf();
+
+  private static final String MS_SERVER_URL = MetastoreConf
+      .getVar(conf, MetastoreConf.ConfVars.THRIFT_URIS, "");
+  private static final String MS_SERVICE_PRINCIPAL =
+      MetastoreConf.getVar(conf, MetastoreConf.ConfVars.KERBEROS_PRINCIPAL, "");
+
+  private static volatile MessageBuilder instance;
+  private static final Object lock = new Object();
+
+  public static MessageBuilder getInstance() {
+    if (instance == null) {
+      synchronized (lock) {
+        if (instance == null) {
+          instance = new MessageBuilder();
+          instance.init();
+        }
+      }
+    }
+    return instance;
+  }
+
+  private static List<Predicate<String>> paramsFilter;
+
+  public void init() {
+    List<String> excludePatterns = Arrays.asList(MetastoreConf
+        .getTrimmedStringsVar(conf,
+            MetastoreConf.ConfVars.EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS));
+    try {
+      paramsFilter = MetaStoreUtils.compilePatternsToPredicates(excludePatterns);
+    } catch (PatternSyntaxException e) {
+      LOG.error("Regex pattern compilation failed. Verify that "
+          + "metastore.notification.parameters.exclude.patterns has valid patterns.");
+      throw new IllegalStateException("Regex pattern compilation failed. " + e.getMessage());
+    }
+  }
+
+  public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
+    return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now());
+  }
+
+  public AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb) {
+    return new JSONAlterDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+        beforeDb, afterDb, now());
+  }
+
+  public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
+    return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now());
+  }
+
+  public CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> fileIter) {
+    return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now());
+  }
+
+  public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp,
+      Long writeId) {
+    return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after,
+        isTruncateOp, writeId, now());
+  }
+
+  public DropTableMessage buildDropTableMessage(Table table) {
+    return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now());
+  }
+
+  public AddPartitionMessage buildAddPartitionMessage(Table table,
+      Iterator<Partition> partitionsIterator, Iterator<PartitionFiles> partitionFileIter) {
+    return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
+        partitionsIterator, partitionFileIter, now());
+  }
+
+  public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
+      Partition after, boolean isTruncateOp, Long writeId) {
+    return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+        table, before, after, isTruncateOp, writeId, now());
+  }
+
+  public DropPartitionMessage buildDropPartitionMessage(Table table,
+      Iterator<Partition> partitionsIterator) {
+    return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
+        getPartitionKeyValues(table, partitionsIterator), now());
+  }
+
+  public CreateFunctionMessage buildCreateFunctionMessage(Function fn) {
+    return new JSONCreateFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
+  }
+
+  public DropFunctionMessage buildDropFunctionMessage(Function fn) {
+    return new JSONDropFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
+  }
+
+  public InsertMessage buildInsertMessage(Table tableObj, Partition partObj,
+      boolean replace, Iterator<String> fileIter) {
+    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+        tableObj, partObj, replace, fileIter, now());
+  }
+
+  public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks) {
+    return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now());
+  }
+
+  public AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks) {
+    return new JSONAddForeignKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fks, now());
+  }
+
+  public AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks) {
+    return new JSONAddUniqueConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, uks, now());
+  }
+
+  public AddNotNullConstraintMessage buildAddNotNullConstraintMessage(
+      List<SQLNotNullConstraint> nns) {
+    return new JSONAddNotNullConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, nns, now());
+  }
+
+  public DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName,
+      String constraintName) {
+    return new JSONDropConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, dbName, tableName,
+        constraintName, now());
+  }
+
+  public CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog) {
+    return new JSONCreateCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(),
+        now());
+  }
+
+  public AlterCatalogMessage buildAlterCatalogMessage(Catalog beforeCat, Catalog afterCat) {
+    return new JSONAlterCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+        beforeCat, afterCat, now());
+  }
+
+  public DropCatalogMessage buildDropCatalogMessage(Catalog catalog) {
+    return new JSONDropCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(),
+        now());
+  }
+
+  public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) {
+    return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now());
+  }
+
+  public CommitTxnMessage buildCommitTxnMessage(Long txnId) {
+    return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
+  }
+
+  public AbortTxnMessage buildAbortTxnMessage(Long txnId) {
+    return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
+  }
+
+  public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,
+      String dbName, String tableName) {
+    return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList,
+        dbName, tableName, now());
+  }
+
+  public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent,
+      Iterator<String> files) {
+    return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent,
+        files);
+  }
+
+  private long now() {
+    return System.currentTimeMillis() / 1000;
+  }
+
+  public static String createPrimaryKeyObjJson(SQLPrimaryKey primaryKeyObj) throws TException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(primaryKeyObj, "UTF-8");
+  }
+
+  public static String createForeignKeyObjJson(SQLForeignKey foreignKeyObj) throws TException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(foreignKeyObj, "UTF-8");
+  }
+
+  public static String createUniqueConstraintObjJson(SQLUniqueConstraint uniqueConstraintObj)
+      throws TException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(uniqueConstraintObj, "UTF-8");
+  }
+
+  public static String createNotNullConstraintObjJson(SQLNotNullConstraint notNullConstaintObj)
+      throws TException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(notNullConstaintObj, "UTF-8");
+  }
+
+  public static String createDatabaseObjJson(Database dbObj) throws TException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(dbObj, "UTF-8");
+  }
+
+  public static String createCatalogObjJson(Catalog catObj) throws TException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(catObj, "UTF-8");
+  }
+
+  public static String createTableObjJson(Table tableObj) throws TException {
+    //Note: The parameters of the Table object will be removed in the filter if it matches
+    // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
+    filterMapkeys(tableObj.getParameters(), paramsFilter);
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(tableObj, "UTF-8");
+  }
+
+  public static String createPartitionObjJson(Partition partitionObj) throws TException {
+    //Note: The parameters of the Partition object will be removed in the filter if it matches
+    // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
+    filterMapkeys(partitionObj.getParameters(), paramsFilter);
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(partitionObj, "UTF-8");
+  }
+
+  public static String createFunctionObjJson(Function functionObj) throws TException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    return serializer.toString(functionObj, "UTF-8");
+  }
+
+  public static Table getTableObj(ObjectNode jsonTree) throws Exception {
+    TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
+    Table tableObj = new Table();
+    String tableJson = jsonTree.get("tableObjJson").asText();
+    deSerializer.deserialize(tableObj, tableJson, "UTF-8");
+    return tableObj;
+  }
+
+  /*
+   * TODO: Some thoughts here : We have a current todo to move some of these methods over to
+   * MessageFactory instead of being here, so we can override them, but before we move them over,
+   * we should keep the following in mind:
+   *
+   * a) We should return Iterables, not Lists. That makes sure that we can be memory-safe when
+   * implementing it rather than forcing ourselves down a path wherein returning List is part of
+   * our interface, and then people use .size() or somesuch which makes us need to materialize
+   * the entire list and not change. Also, returning Iterables allows us to do things like
+   * Iterables.transform for some of these.
+   * b) We should not have "magic" names like "tableObjJson", because that breaks expectation of a
+   * couple of things - firstly, that of serialization format, although that is fine for this
+   * JSONMessageEncoder, and secondly, that makes us just have a number of mappings, one for each
+   * obj type, and sometimes, as the case is with alter, have multiples. Also, any event-specific
+   * item belongs in that event message / event itself, as opposed to in the factory. It's okay to
+   * have utility accessor methods here that are used by each of the messages to provide accessors.
+   * I'm adding a couple of those here.
+   *
+   */
+
+  public static TBase getTObj(String tSerialized, Class<? extends TBase> objClass)
+      throws Exception {
+    TDeserializer thriftDeSerializer = new TDeserializer(new TJSONProtocol.Factory());
+    TBase obj = objClass.newInstance();
+    thriftDeSerializer.deserialize(obj, tSerialized, "UTF-8");
+    return obj;
+  }
+
+  public static Iterable<? extends TBase> getTObjs(
+      Iterable<String> objRefStrs, final Class<? extends TBase> objClass) throws Exception {
+
+    try {
+      return Iterables.transform(objRefStrs, new com.google.common.base.Function<String, TBase>() {
+
+        public TBase apply(@Nullable String objStr) {
+          try {
+            return getTObj(objStr, objClass);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    } catch (RuntimeException re) {
+      // We have to add this bit of exception handling here, because Function.apply does not allow us to throw
+      // the actual exception that might be a checked exception, so we wind up needing to throw a RuntimeException
+      // with the previously thrown exception as its cause. However, since RuntimeException.getCause() returns
+      // a throwable instead of an Exception, we have to account for the possibility that the underlying code
+      // might have thrown a Throwable that we wrapped instead, in which case, continuing to throw the
+      // RuntimeException is the best thing we can do.
+      Throwable t = re.getCause();
+      if (t instanceof Exception) {
+        throw (Exception) t;
+      } else {
+        throw re;
+      }
+    }
+  }
+
+  // If we do not need this format of accessor using ObjectNode, this is a candidate for removal as well
+  public static Iterable<? extends TBase> getTObjs(
+      ObjectNode jsonTree, String objRefListName, final Class<? extends TBase> objClass)
+      throws Exception {
+    Iterable<JsonNode> jsonArrayIterator = jsonTree.get(objRefListName);
+    return getTObjs(Iterables.transform(jsonArrayIterator, JsonNode::asText), objClass);
+  }
+
+  public static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
+    Map<String, String> partitionKeys = new LinkedHashMap<>();
+    for (int i = 0; i < table.getPartitionKeysSize(); ++i) {
+      partitionKeys.put(table.getPartitionKeys().get(i).getName(),
+          partition.getValues().get(i));
+    }
+    return partitionKeys;
+  }
+
+  public static List<Map<String, String>> getPartitionKeyValues(final Table table,
+      Iterator<Partition> iterator) {
+    return Lists.newArrayList(Iterators
+        .transform(iterator, partition -> getPartitionKeyValues(table, partition)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java
new file mode 100644
index 0000000..832a80c
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+public interface MessageEncoder {
+  MessageDeserializer getDeserializer();
+
+  MessageSerializer getSerializer();
+
+  String getMessageFormat();
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index 58c6891..16e74bb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -20,329 +20,86 @@
 package org.apache.hadoop.hive.metastore.messaging;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.Catalog;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
-import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
-import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.List;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Abstract Factory for the construction of HCatalog message instances.
  */
 public abstract class MessageFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(MessageFactory.class.getName());
 
-  // Common name constants for event messages
-  public static final String ADD_PARTITION_EVENT = "ADD_PARTITION";
-  public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION";
-  public static final String DROP_PARTITION_EVENT = "DROP_PARTITION";
-  public static final String CREATE_TABLE_EVENT = "CREATE_TABLE";
-  public static final String ALTER_TABLE_EVENT = "ALTER_TABLE";
-  public static final String DROP_TABLE_EVENT = "DROP_TABLE";
-  public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE";
-  public static final String ALTER_DATABASE_EVENT = "ALTER_DATABASE";
-  public static final String DROP_DATABASE_EVENT = "DROP_DATABASE";
-  public static final String INSERT_EVENT = "INSERT";
-  public static final String CREATE_FUNCTION_EVENT = "CREATE_FUNCTION";
-  public static final String DROP_FUNCTION_EVENT = "DROP_FUNCTION";
-  public static final String ADD_PRIMARYKEY_EVENT = "ADD_PRIMARYKEY";
-  public static final String ADD_FOREIGNKEY_EVENT = "ADD_FOREIGNKEY";
-  public static final String ADD_UNIQUECONSTRAINT_EVENT = "ADD_UNIQUECONSTRAINT";
-  public static final String ADD_NOTNULLCONSTRAINT_EVENT = "ADD_NOTNULLCONSTRAINT";
-  public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT";
-  public static final String CREATE_ISCHEMA_EVENT = "CREATE_ISCHEMA";
-  public static final String ALTER_ISCHEMA_EVENT = "ALTER_ISCHEMA";
-  public static final String DROP_ISCHEMA_EVENT = "DROP_ISCHEMA";
-  public static final String ADD_SCHEMA_VERSION_EVENT = "ADD_SCHEMA_VERSION";
-  public static final String ALTER_SCHEMA_VERSION_EVENT = "ALTER_SCHEMA_VERSION";
-  public static final String DROP_SCHEMA_VERSION_EVENT = "DROP_SCHEMA_VERSION";
-  public static final String CREATE_CATALOG_EVENT = "CREATE_CATALOG";
-  public static final String DROP_CATALOG_EVENT = "DROP_CATALOG";
-  public static final String OPEN_TXN_EVENT = "OPEN_TXN";
-  public static final String COMMIT_TXN_EVENT = "COMMIT_TXN";
-  public static final String ABORT_TXN_EVENT = "ABORT_TXN";
-  public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT";
-  public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG";
-  public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT";
+  protected static final Configuration conf = MetastoreConf.newMetastoreConf();
 
-  private static MessageFactory instance = null;
+  private static final Map<String, Method> registry = new HashMap<>();
 
-  protected static final Configuration conf = MetastoreConf.newMetastoreConf();
-  /*
-  // TODO MS-SPLIT I'm 99% certain we don't need this, as MetastoreConf.newMetastoreConf already
-  adds this resource.
-  static {
-    conf.addResource("hive-site.xml");
+  public static void register(String messageFormat, Class clazz) {
+    Method method = requiredMethod(clazz);
+    registry.put(messageFormat, method);
   }
-  */
-
-  protected static final String MS_SERVER_URL = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS, "");
-  protected static final String MS_SERVICE_PRINCIPAL =
-      MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL, "");
 
+  static {
+    register(GzipJSONMessageEncoder.FORMAT, GzipJSONMessageEncoder.class);
+    register(JSONMessageEncoder.FORMAT, JSONMessageEncoder.class);
+  }
 
-  /**
-   * Getter for MessageFactory instance.
-   */
-  public static MessageFactory getInstance() {
-    if (instance == null) {
-      instance = getInstance(MetastoreConf.getVar(conf, ConfVars.EVENT_MESSAGE_FACTORY));
+  private static Method requiredMethod(Class clazz) {
+    if (MessageEncoder.class.isAssignableFrom(clazz)) {
+      try {
+        Method methodInstance = clazz.getMethod("getInstance");
+        if (MessageEncoder.class.isAssignableFrom(methodInstance.getReturnType())) {
+          int modifiers = methodInstance.getModifiers();
+          if (Modifier.isStatic(modifiers) && Modifier.isPublic(modifiers)) {
+            return methodInstance;
+          }
+          throw new NoSuchMethodException(
+              "modifier for getInstance() method is not 'public static' in " + clazz
+                  .getCanonicalName());
+        }
+        throw new NoSuchMethodException(
+            "return type is not assignable to " + MessageEncoder.class.getCanonicalName());
+      } catch (NoSuchMethodException e) {
+        String message = clazz.getCanonicalName()
+            + " does not implement the required 'public static MessageEncoder getInstance()' method ";
+        LOG.error(message, e);
+        throw new IllegalArgumentException(message, e);
+      }
     }
-    return instance;
+    String message = clazz.getCanonicalName() + " is not assignable to " + MessageEncoder.class
+        .getCanonicalName();
+    LOG.error(message);
+    throw new IllegalArgumentException(message);
   }
 
-  private static MessageFactory getInstance(String className) {
-    try {
-      MessageFactory factory = JavaUtils.newInstance(JavaUtils.getClass(className, MessageFactory.class));
-      factory.init();
-      return factory;
-    } catch (MetaException e) {
-      throw new IllegalStateException("Could not construct MessageFactory implementation: ", e);
+  public static MessageEncoder getInstance(String messageFormat)
+      throws InvocationTargetException, IllegalAccessException {
+    Method methodInstance = registry.get(messageFormat);
+    if (methodInstance == null) {
+      LOG.error("received incorrect MessageFormat " + messageFormat);
+      throw new RuntimeException("messageFormat: " + messageFormat + " is not supported ");
     }
+    return (MessageEncoder) methodInstance.invoke(null);
   }
 
-  /**
-   * Getter for MessageDeserializer, corresponding to the specified format and version.
-   * @param format Serialization format for notifications.
-   * @param version Version of serialization format (currently ignored.)
-   * @return MessageDeserializer.
-   */
-  public static MessageDeserializer getDeserializer(String format,
-                            String version) {
-    return getInstance(MetastoreConf.getVar(conf, ConfVars.EVENT_MESSAGE_FACTORY)).getDeserializer();
-    // Note : The reason this method exists outside the no-arg getDeserializer method is in
-    // case there is a user-implemented MessageFactory that's used, and some the messages
-    // are in an older format and the rest in another. Then, what MessageFactory is default
-    // is irrelevant, we should always use the one that was used to create it to deserialize.
-    //
-    // There exist only 2 implementations of this - json and jms
-    //
-    // Additional note : rather than as a config parameter, does it make sense to have
-    // this use jdbc-like semantics that each MessageFactory made available register
-    // itself for discoverability? Might be worth pursuing.
+  public static MessageEncoder getDefaultInstance(Configuration conf) {
+    String clazz =
+        MetastoreConf.get(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getVarname());
+    try {
+      Class<?> clazzObject = MessageFactory.class.getClassLoader().loadClass(clazz);
+      return (MessageEncoder) requiredMethod(clazzObject).invoke(null);
+    } catch (Exception e) {
+      String message = "could not load the configured class " + clazz;
+      LOG.error(message, e);
+      throw new IllegalStateException(message, e);
+    }
   }
-
-  public void init() throws MetaException {}
-
-  public abstract MessageDeserializer getDeserializer();
-
-  /**
-   * Getter for message-format.
-   */
-  public abstract String getMessageFormat();
-
-  /**
-   * Factory method for CreateDatabaseMessage.
-   * @param db The Database being added.
-   * @return CreateDatabaseMessage instance.
-   */
-  public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db);
-
-  /**
-   * Factory method for AlterDatabaseMessage.
-   * @param beforeDb The Database before alter.
-   * @param afterDb The Database after alter.
-   * @return AlterDatabaseMessage instance.
-   */
-  public abstract AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb);
-
-  /**
-   * Factory method for DropDatabaseMessage.
-   * @param db The Database being dropped.
-   * @return DropDatabaseMessage instance.
-   */
-  public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db);
-
-  /**
-   * Factory method for CreateTableMessage.
-   * @param table The Table being created.
-   * @param files Iterator of files
-   * @return CreateTableMessage instance.
-   */
-  public abstract CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> files);
-
-  /**
-   * Factory method for AlterTableMessage.  Unlike most of these calls, this one can return null,
-   * which means no message should be sent.  This is because there are many flavors of alter
-   * table (add column, add partition, etc.).  Some are covered elsewhere (like add partition)
-   * and some are not yet supported.
-   * @param before The table before the alter
-   * @param after The table after the alter
-   * @param isTruncateOp Flag to denote truncate table
-   * @param writeId writeId under which alter is done (for ACID tables)
-   * @return
-   */
-  public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp,
-                                                           Long writeId);
-
-  /**
-   * Factory method for DropTableMessage.
-   * @param table The Table being dropped.
-   * @return DropTableMessage instance.
-   */
-  public abstract DropTableMessage buildDropTableMessage(Table table);
-
-    /**
-     * Factory method for AddPartitionMessage.
-     * @param table The Table to which the partitions are added.
-     * @param partitions The iterator to set of Partitions being added.
-     * @param partitionFiles The iterator of partition files
-     * @return AddPartitionMessage instance.
-     */
-  public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator<Partition> partitions,
-      Iterator<PartitionFiles> partitionFiles);
-
-  /**
-   * Factory method for building AlterPartitionMessage
-   * @param table The table in which the partition is being altered
-   * @param before The partition before it was altered
-   * @param after The partition after it was altered
-   * @param isTruncateOp Flag to denote truncate partition
-   * @param writeId writeId under which alter is done (for ACID tables)
-   * @return a new AlterPartitionMessage
-   */
-  public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
-                                                                   Partition after, boolean isTruncateOp,
-                                                                   Long writeId);
-
-  /**
-   * Factory method for DropPartitionMessage.
-   * @param table The Table from which the partition is dropped.
-   * @param partitions The set of partitions being dropped.
-   * @return DropPartitionMessage instance.
-   */
-  public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Iterator<Partition> partitions);
-
-  /**
-   * Factory method for CreateFunctionMessage.
-   * @param fn The Function being added.
-   * @return CreateFunctionMessage instance.
-   */
-  public abstract CreateFunctionMessage buildCreateFunctionMessage(Function fn);
-
-  /**
-   * Factory method for DropFunctionMessage.
-   * @param fn The Function being dropped.
-   * @return DropFunctionMessage instance.
-   */
-  public abstract DropFunctionMessage buildDropFunctionMessage(Function fn);
-
-  /**
-   * Factory method for building insert message
-   *
-   * @param tableObj Table object where the insert occurred in
-   * @param ptnObj Partition object where the insert occurred in, may be null if
-   *          the insert was done into a non-partitioned table
-   * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
-   * @param files Iterator of file created
-   * @return instance of InsertMessage
-   */
-  public abstract InsertMessage buildInsertMessage(Table tableObj, Partition ptnObj,
-                                                   boolean replace, Iterator<String> files);
-
-  /**
-   * Factory method for building open txn message using start and end transaction range
-   *
-   * @param fromTxnId start transaction id (inclusive)
-   * @param toTxnId end transaction id (inclusive)
-   * @return instance of OpenTxnMessage
-   */
-  public abstract OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId);
-
-  /**
-   * Factory method for building commit txn message
-   *
-   * @param txnId Id of the transaction to be committed
-   * @return instance of CommitTxnMessage
-   */
-  public abstract CommitTxnMessage buildCommitTxnMessage(Long txnId);
-
-  /**
-   * Factory method for building abort txn message
-   *
-   * @param txnId Id of the transaction to be aborted
-   * @return instance of AbortTxnMessage
-   */
-  public abstract AbortTxnMessage buildAbortTxnMessage(Long txnId);
-
-  /**
-   * Factory method for building alloc write id message
-   *
-   * @param txnToWriteIdList List of Txn Ids and write id map
-   * @param dbName db for which write ids to be allocated
-   * @param tableName table for which write ids to be allocated
-   * @return instance of AllocWriteIdMessage
-   */
-  public abstract AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList, String dbName,
-                                                               String tableName);
-
-  /***
-   * Factory method for building add primary key message
-   *
-   * @param pks list of primary keys
-   * @return instance of AddPrimaryKeyMessage
-   */
-  public abstract AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks);
-
-  /***
-   * Factory method for building add foreign key message
-   *
-   * @param fks list of foreign keys
-   * @return instance of AddForeignKeyMessage
-   */
-  public abstract AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks);
-
-  /***
-   * Factory method for building add unique constraint message
-   *
-   * @param uks list of unique constraints
-   * @return instance of SQLUniqueConstraint
-   */
-  public abstract AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks);
-
-  /***
-   * Factory method for building add not null constraint message
-   *
-   * @param nns list of not null constraints
-   * @return instance of SQLNotNullConstraint
-   */
-  public abstract AddNotNullConstraintMessage buildAddNotNullConstraintMessage(List<SQLNotNullConstraint> nns);
-
-  /***
-   * Factory method for building drop constraint message
-   * @param dbName
-   * @param tableName
-   * @param constraintName
-   * @return
-   */
-  public abstract DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName,
-      String constraintName);
-
-  public abstract CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog);
-
-  public abstract DropCatalogMessage buildDropCatalogMessage(Catalog catalog);
-
-  public abstract AlterCatalogMessage buildAlterCatalogMessage(Catalog oldCat, Catalog newCat);
-
-  /**
-   * Factory method for building acid write message
-   *
-   *
-   * @param acidWriteEvent information related to the acid write operation
-   * @param files files added by this write operation
-   * @return instance of AcidWriteMessage
-   */
-  public abstract AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
new file mode 100644
index 0000000..b249d76
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+public interface MessageSerializer {
+  default String serialize(EventMessage message) {
+    return message.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
index fdb6942..712c12c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.metastore.messaging.event.filters;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 
 import java.util.regex.Pattern;
 
@@ -41,9 +41,9 @@ public class DatabaseAndTableFilter extends BasicFilter {
   }
 
   private boolean isTxnRelatedEvent(final NotificationEvent event) {
-    return ((event.getEventType().equals(MessageFactory.OPEN_TXN_EVENT)) ||
-            (event.getEventType().equals(MessageFactory.COMMIT_TXN_EVENT)) ||
-            (event.getEventType().equals(MessageFactory.ABORT_TXN_EVENT)));
+    return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) ||
+            (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) ||
+            (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT)));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
index cc528ee..a5d8f78 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 import java.util.Iterator;
 import java.util.List;
@@ -60,9 +61,9 @@ public class JSONAcidWriteMessage extends AcidWriteMessage {
     this.writeId = acidWriteEvent.getWriteId();
     this.partition = acidWriteEvent.getPartition();
     try {
-      this.tableObjJson = JSONMessageFactory.createTableObjJson(acidWriteEvent.getTableObj());
+      this.tableObjJson = MessageBuilder.createTableObjJson(acidWriteEvent.getTableObj());
       if (acidWriteEvent.getPartitionObj() != null) {
-        this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(acidWriteEvent.getPartitionObj());
+        this.partitionObjJson = MessageBuilder.createPartitionObjJson(acidWriteEvent.getPartitionObj());
       } else {
         this.partitionObjJson = null;
       }
@@ -119,13 +120,13 @@ public class JSONAcidWriteMessage extends AcidWriteMessage {
 
   @Override
   public Table getTableObj() throws Exception {
-    return (tableObjJson == null) ? null : (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class);
+    return (tableObjJson == null) ? null : (Table) MessageBuilder.getTObj(tableObjJson, Table.class);
   }
 
   @Override
   public Partition getPartitionObj() throws Exception {
     return ((partitionObjJson == null) ? null :
-            (Partition) JSONMessageFactory.getTObj(partitionObjJson, Partition.class));
+            (Partition) MessageBuilder.getTObj(partitionObjJson, Partition.class));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
index d4a0bc2..c3d6fb6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -55,7 +56,7 @@ public class JSONAddForeignKeyMessage extends AddForeignKeyMessage {
     this.foreignKeyListJson = new ArrayList<>();
     try {
       for (SQLForeignKey pk : fks) {
-        foreignKeyListJson.add(JSONMessageFactory.createForeignKeyObjJson(pk));
+        foreignKeyListJson.add(MessageBuilder.createForeignKeyObjJson(pk));
       }
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
@@ -86,7 +87,7 @@ public class JSONAddForeignKeyMessage extends AddForeignKeyMessage {
   public List<SQLForeignKey> getForeignKeys() throws Exception {
     List<SQLForeignKey> fks = new ArrayList<>();
     for (String pkJson : foreignKeyListJson) {
-      fks.add((SQLForeignKey)JSONMessageFactory.getTObj(pkJson, SQLForeignKey.class));
+      fks.add((SQLForeignKey) MessageBuilder.getTObj(pkJson, SQLForeignKey.class));
     }
     return fks;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
index 1c3e8a8..f9f351f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -50,7 +51,7 @@ public class JSONAddNotNullConstraintMessage extends AddNotNullConstraintMessage
     this.notNullConstraintListJson = new ArrayList<>();
     try {
       for (SQLNotNullConstraint nn : nns) {
-        notNullConstraintListJson.add(JSONMessageFactory.createNotNullConstraintObjJson(nn));
+        notNullConstraintListJson.add(MessageBuilder.createNotNullConstraintObjJson(nn));
       }
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
@@ -81,7 +82,7 @@ public class JSONAddNotNullConstraintMessage extends AddNotNullConstraintMessage
   public List<SQLNotNullConstraint> getNotNullConstraints() throws Exception {
     List<SQLNotNullConstraint> nns = new ArrayList<>();
     for (String nnJson : notNullConstraintListJson) {
-      nns.add((SQLNotNullConstraint)JSONMessageFactory.getTObj(nnJson, SQLNotNullConstraint.class));
+      nns.add((SQLNotNullConstraint) MessageBuilder.getTObj(nnJson, SQLNotNullConstraint.class));
     }
     return nns;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
index bb2093b..6494cb8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
 import org.apache.thrift.TException;
 
@@ -79,11 +80,11 @@ public class JSONAddPartitionMessage extends AddPartitionMessage {
     partitionListJson = new ArrayList<>();
     Partition partitionObj;
     try {
-      this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+      this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
       while (partitionsIterator.hasNext()) {
         partitionObj = partitionsIterator.next();
-        partitions.add(JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObj));
-        partitionListJson.add(JSONMessageFactory.createPartitionObjJson(partitionObj));
+        partitions.add(MessageBuilder.getPartitionKeyValues(tableObj, partitionObj));
+        partitionListJson.add(MessageBuilder.createPartitionObjJson(partitionObj));
       }
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
@@ -124,7 +125,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage {
 
   @Override
   public Table getTableObj() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
   }
 
   @Override
@@ -141,7 +142,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage {
   public Iterable<Partition> getPartitionObjs() throws Exception {
     // glorified cast from Iterable<TBase> to Iterable<Partition>
     return Iterables.transform(
-        JSONMessageFactory.getTObjs(partitionListJson,Partition.class),
+        MessageBuilder.getTObjs(partitionListJson,Partition.class),
         new Function<Object, Partition>() {
       @Nullable
       @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
index 3a18be8..606a051 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -55,7 +56,7 @@ public class JSONAddPrimaryKeyMessage extends AddPrimaryKeyMessage {
     this.primaryKeyListJson = new ArrayList<>();
     try {
       for (SQLPrimaryKey pk : pks) {
-        primaryKeyListJson.add(JSONMessageFactory.createPrimaryKeyObjJson(pk));
+        primaryKeyListJson.add(MessageBuilder.createPrimaryKeyObjJson(pk));
       }
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
@@ -86,7 +87,7 @@ public class JSONAddPrimaryKeyMessage extends AddPrimaryKeyMessage {
   public List<SQLPrimaryKey> getPrimaryKeys() throws Exception {
     List<SQLPrimaryKey> pks = new ArrayList<>();
     for (String pkJson : primaryKeyListJson) {
-      pks.add((SQLPrimaryKey)JSONMessageFactory.getTObj(pkJson, SQLPrimaryKey.class));
+      pks.add((SQLPrimaryKey) MessageBuilder.getTObj(pkJson, SQLPrimaryKey.class));
     }
     return pks;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
index 3c4d5e0..ebdcd94 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -52,7 +53,7 @@ public class JSONAddUniqueConstraintMessage extends AddUniqueConstraintMessage {
     this.uniqueConstraintListJson = new ArrayList<>();
     try {
       for (SQLUniqueConstraint uk : uks) {
-        uniqueConstraintListJson.add(JSONMessageFactory.createUniqueConstraintObjJson(uk));
+        uniqueConstraintListJson.add(MessageBuilder.createUniqueConstraintObjJson(uk));
       }
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
@@ -83,7 +84,7 @@ public class JSONAddUniqueConstraintMessage extends AddUniqueConstraintMessage {
   public List<SQLUniqueConstraint> getUniqueConstraints() throws Exception {
     List<SQLUniqueConstraint> uks = new ArrayList<>();
     for (String pkJson : uniqueConstraintListJson) {
-      uks.add((SQLUniqueConstraint)JSONMessageFactory.getTObj(pkJson, SQLUniqueConstraint.class));
+      uks.add((SQLUniqueConstraint) MessageBuilder.getTObj(pkJson, SQLUniqueConstraint.class));
     }
     return uks;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
index 779c0b0..7b7c12e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.messaging.AlterCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 public class JSONAlterCatalogMessage extends AlterCatalogMessage {
@@ -41,8 +42,8 @@ public class JSONAlterCatalogMessage extends AlterCatalogMessage {
     this.servicePrincipal = servicePrincipal;
     this.timestamp = timestamp;
     try {
-      this.catObjBeforeJson = JSONMessageFactory.createCatalogObjJson(catObjBefore);
-      this.catObjAfterJson = JSONMessageFactory.createCatalogObjJson(catObjAfter);
+      this.catObjBeforeJson = MessageBuilder.createCatalogObjJson(catObjBefore);
+      this.catObjAfterJson = MessageBuilder.createCatalogObjJson(catObjAfter);
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
     }
@@ -71,12 +72,12 @@ public class JSONAlterCatalogMessage extends AlterCatalogMessage {
 
   @Override
   public Catalog getCatObjBefore() throws Exception {
-    return (Catalog) JSONMessageFactory.getTObj(catObjBeforeJson, Catalog.class);
+    return (Catalog) MessageBuilder.getTObj(catObjBeforeJson, Catalog.class);
   }
 
   @Override
   public Catalog getCatObjAfter() throws Exception {
-    return (Catalog) JSONMessageFactory.getTObj(catObjAfterJson, Catalog.class);
+    return (Catalog) MessageBuilder.getTObj(catObjAfterJson, Catalog.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
index 7b316d5..5f9dae4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
 
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -48,8 +49,8 @@ public class JSONAlterDatabaseMessage extends AlterDatabaseMessage {
     this.db = dbObjBefore.getName();
     this.timestamp = timestamp;
     try {
-      this.dbObjBeforeJson = JSONMessageFactory.createDatabaseObjJson(dbObjBefore);
-      this.dbObjAfterJson = JSONMessageFactory.createDatabaseObjJson(dbObjAfter);
+      this.dbObjBeforeJson = MessageBuilder.createDatabaseObjJson(dbObjBefore);
+      this.dbObjAfterJson = MessageBuilder.createDatabaseObjJson(dbObjAfter);
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
     }
@@ -78,12 +79,12 @@ public class JSONAlterDatabaseMessage extends AlterDatabaseMessage {
 
   @Override
   public Database getDbObjBefore() throws Exception {
-    return (Database) JSONMessageFactory.getTObj(dbObjBeforeJson, Database.class);
+    return (Database) MessageBuilder.getTObj(dbObjBeforeJson, Database.class);
   }
 
   @Override
   public Database getDbObjAfter() throws Exception {
-    return (Database) JSONMessageFactory.getTObj(dbObjAfterJson, Database.class);
+    return (Database) MessageBuilder.getTObj(dbObjAfterJson, Database.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
index 9b85f4c..a38c1aa 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -62,12 +63,12 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
     this.tableType = tableObj.getTableType();
     this.isTruncateOp = Boolean.toString(isTruncateOp);
     this.timestamp = timestamp;
-    this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore);
+    this.keyValues = MessageBuilder.getPartitionKeyValues(tableObj, partitionObjBefore);
     this.writeId = writeId;
     try {
-      this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
-      this.partitionObjBeforeJson = JSONMessageFactory.createPartitionObjJson(partitionObjBefore);
-      this.partitionObjAfterJson = JSONMessageFactory.createPartitionObjJson(partitionObjAfter);
+      this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
+      this.partitionObjBeforeJson = MessageBuilder.createPartitionObjJson(partitionObjBefore);
+      this.partitionObjAfterJson = MessageBuilder.createPartitionObjJson(partitionObjAfter);
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
     }
@@ -118,17 +119,17 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
 
   @Override
   public Table getTableObj() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
   }
 
   @Override
   public Partition getPtnObjBefore() throws Exception {
-    return (Partition) JSONMessageFactory.getTObj(partitionObjBeforeJson, Partition.class);
+    return (Partition) MessageBuilder.getTObj(partitionObjBeforeJson, Partition.class);
   }
 
   @Override
   public Partition getPtnObjAfter() throws Exception {
-    return (Partition) JSONMessageFactory.getTObj(partitionObjAfterJson, Partition.class);
+    return (Partition) MessageBuilder.getTObj(partitionObjAfterJson, Partition.class);
   }
 
   public String getTableObjJson() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
index eddff98..d6ec826 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
 
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -55,8 +56,8 @@ public class JSONAlterTableMessage extends AlterTableMessage {
     this.timestamp = timestamp;
     this.writeId = writeId;
     try {
-      this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore);
-      this.tableObjAfterJson = JSONMessageFactory.createTableObjJson(tableObjAfter);
+      this.tableObjBeforeJson = MessageBuilder.createTableObjJson(tableObjBefore);
+      this.tableObjAfterJson = MessageBuilder.createTableObjJson(tableObjAfter);
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
     }
@@ -102,12 +103,12 @@ public class JSONAlterTableMessage extends AlterTableMessage {
 
   @Override
   public Table getTableObjBefore() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjBeforeJson,Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjBeforeJson,Table.class);
   }
 
   @Override
   public Table getTableObjAfter() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjAfterJson,Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjAfterJson,Table.class);
   }
 
   public String getTableObjBeforeJson() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
index 2c4940b..482fc8e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 
 import java.util.List;
 
@@ -118,13 +119,13 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
 
   @Override
   public Table getTableObj(int idx) throws Exception {
-    return tableObjs == null ? null :  (Table) JSONMessageFactory.getTObj(tableObjs.get(idx), Table.class);
+    return tableObjs == null ? null :  (Table) MessageBuilder.getTObj(tableObjs.get(idx), Table.class);
   }
 
   @Override
   public Partition getPartitionObj(int idx) throws Exception {
     return (partitionObjs == null ? null : (partitionObjs.get(idx) == null ? null :
-            (Partition)JSONMessageFactory.getTObj(partitionObjs.get(idx), Partition.class)));
+            (Partition) MessageBuilder.getTObj(partitionObjs.get(idx), Partition.class)));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
index 761ff99..1f5c9e8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
 
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -48,7 +49,7 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage {
     this.db = db.getName();
     this.timestamp = timestamp;
     try {
-      this.dbJson = JSONMessageFactory.createDatabaseObjJson(db);
+      this.dbJson = MessageBuilder.createDatabaseObjJson(db);
     } catch (TException ex) {
       throw new IllegalArgumentException("Could not serialize Function object", ex);
     }
@@ -57,7 +58,7 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage {
 
   @Override
   public Database getDatabaseObject() throws Exception {
-    return (Database) JSONMessageFactory.getTObj(dbJson, Database.class);
+    return (Database) MessageBuilder.getTObj(dbJson, Database.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
index f7287df..bb50052 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
 
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -47,7 +48,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage {
     this.db = fn.getDbName();
     this.timestamp = timestamp;
     try {
-      this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+      this.functionObjJson = MessageBuilder.createFunctionObjJson(fn);
     } catch (TException ex) {
       throw new IllegalArgumentException("Could not serialize Function object", ex);
     }
@@ -72,7 +73,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage {
 
   @Override
   public Function getFunctionObj() throws Exception {
-    return (Function) JSONMessageFactory.getTObj(functionObjJson,Function.class);
+    return (Function) MessageBuilder.getTObj(functionObjJson,Function.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
index b80003b..145ee4b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.thrift.TException;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -68,7 +69,7 @@ public class JSONCreateTableMessage extends CreateTableMessage {
     this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(),
         tableObj.getTableType(), timestamp);
     try {
-      this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+      this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
     }
@@ -111,7 +112,7 @@ public class JSONCreateTableMessage extends CreateTableMessage {
 
   @Override
   public Table getTableObj() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
   }
 
   public String getTableObjJson() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
index 957d595..23e5496 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.thrift.TException;
 
@@ -70,7 +71,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage {
     this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(),
         tableObj.getTableType(), partitionKeyValues, timestamp);
     try {
-      this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+      this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
     }
@@ -117,7 +118,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage {
 
   @Override
   public Table getTableObj() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjJson, Table.class);
   }
 
   public String getTableObjJson() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
index 88374ec..1ef2ad0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hive.metastore.messaging.json;
 
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
 import org.apache.thrift.TException;
 
@@ -63,7 +64,7 @@ public class JSONDropTableMessage extends DropTableMessage {
     this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(),
         tableObj.getTableType(), timestamp);
     try {
-      this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+      this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
     } catch (TException e) {
       throw new IllegalArgumentException("Could not serialize: ", e);
     }
@@ -86,7 +87,7 @@ public class JSONDropTableMessage extends DropTableMessage {
 
   @Override
   public Table getTableObj() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index 2318a67..40d480b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.thrift.TException;
 
@@ -67,9 +68,9 @@ public class JSONInsertMessage extends InsertMessage {
     this.tableType = tableObj.getTableType();
 
     try {
-      this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+      this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
       if (null != ptnObj) {
-        this.ptnObjJson = JSONMessageFactory.createPartitionObjJson(ptnObj);
+        this.ptnObjJson = MessageBuilder.createPartitionObjJson(ptnObj);
       } else {
         this.ptnObjJson = null;
       }
@@ -128,12 +129,12 @@ public class JSONInsertMessage extends InsertMessage {
 
   @Override
   public Table getTableObj() throws Exception {
-    return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+    return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
   }
 
   @Override
   public Partition getPtnObj() throws Exception {
-    return ((null == ptnObjJson) ? null : (Partition) JSONMessageFactory.getTObj(ptnObjJson, Partition.class));
+    return ((null == ptnObjJson) ? null : (Partition) MessageBuilder.getTObj(ptnObjJson, Partition.class));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java
new file mode 100644
index 0000000..08c53e4
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging.json;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+
+/**
+ * The JSON implementation of the MessageFactory. Constructs JSON implementations of each
+ * message-type.
+ */
+public class JSONMessageEncoder implements MessageEncoder {
+  public static final String FORMAT = "json-0.2";
+
+  private static MessageDeserializer deserializer = new JSONMessageDeserializer();
+  private static MessageSerializer serializer = new MessageSerializer() {
+  };
+
+  private static volatile MessageEncoder instance;
+
+  public static MessageEncoder getInstance() {
+    if (instance == null) {
+      synchronized (GzipJSONMessageEncoder.class) {
+        if (instance == null) {
+          instance = new JSONMessageEncoder();
+        }
+      }
+    }
+    return instance;
+  }
+
+  @Override
+  public MessageDeserializer getDeserializer() {
+    return deserializer;
+  }
+
+  @Override
+  public MessageSerializer getSerializer() {
+    return serializer;
+  }
+
+  /**
+   * This is a format that's shipped, for any changes make sure that backward compatibiltiy
+   * with existing messages in this format are taken care of.
+   *
+   */
+  @Override
+  public String getMessageFormat() {
+    return FORMAT;
+  }
+}