You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:02 UTC

[52/75] [abbrv] hive git commit: HIVE-20679: DDL operations on hive might create large messages for DBNotification (Anishek Agarwal, reviewed by Sankar Hariappan)

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
deleted file mode 100644
index 6aa079d..0000000
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.messaging.json;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Predicate;
-import java.util.regex.PatternSyntaxException;
-
-import javax.annotation.Nullable;
-
-import org.apache.hadoop.hive.metastore.api.Catalog;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
-import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
-import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
-import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterCatalogMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateCatalogMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropCatalogMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
-import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.filterMapkeys;
-
-/**
- * The JSON implementation of the MessageFactory. Constructs JSON implementations of each
- * message-type.
- */
-public class JSONMessageFactory extends MessageFactory {
-
-  private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName());
-
-  private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
-
-  private static List<Predicate<String>> paramsFilter;
-
-  @Override
-  public void init() throws MetaException {
-    super.init();
-
-    List<String> excludePatterns = Arrays.asList(MetastoreConf
-        .getTrimmedStringsVar(conf, MetastoreConf.ConfVars.EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS));
-    try {
-      paramsFilter = MetaStoreUtils.compilePatternsToPredicates(excludePatterns);
-    } catch (PatternSyntaxException e) {
-      LOG.error("Regex pattern compilation failed. Verify that "
-          + "metastore.notification.parameters.exclude.patterns has valid patterns.");
-      throw new MetaException("Regex pattern compilation failed. " + e.getMessage());
-    }
-  }
-
-  @Override
-  public MessageDeserializer getDeserializer() {
-    return deserializer;
-  }
-
-  @Override
-  public String getMessageFormat() {
-    return "json-0.2";
-  }
-
-  @Override
-  public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
-    return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now());
-  }
-
-  @Override
-  public AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb) {
-    return new JSONAlterDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
-                                        beforeDb, afterDb, now());
-  }
-
-  @Override
-  public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
-    return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now());
-  }
-
-  @Override
-  public CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> fileIter) {
-    return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now());
-  }
-
-  @Override
-  public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, Long writeId) {
-    return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, isTruncateOp, writeId, now());
-  }
-
-  @Override
-  public DropTableMessage buildDropTableMessage(Table table) {
-    return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now());
-  }
-
-  @Override
-  public AddPartitionMessage buildAddPartitionMessage(Table table,
-      Iterator<Partition> partitionsIterator, Iterator<PartitionFiles> partitionFileIter) {
-    return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
-        partitionsIterator, partitionFileIter, now());
-  }
-
-  @Override
-  public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
-      Partition after, boolean isTruncateOp, Long writeId) {
-    return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
-                                        table, before, after, isTruncateOp, writeId, now());
-  }
-
-  @Override
-  public DropPartitionMessage buildDropPartitionMessage(Table table,
-      Iterator<Partition> partitionsIterator) {
-    return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
-        getPartitionKeyValues(table, partitionsIterator), now());
-  }
-
-  @Override
-  public CreateFunctionMessage buildCreateFunctionMessage(Function fn) {
-    return new JSONCreateFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
-  }
-
-  @Override
-  public DropFunctionMessage buildDropFunctionMessage(Function fn) {
-    return new JSONDropFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
-  }
-
-  @Override
-  public InsertMessage buildInsertMessage(Table tableObj, Partition partObj,
-                                          boolean replace, Iterator<String> fileIter) {
-    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
-                                tableObj, partObj, replace, fileIter, now());
-  }
-
-  @Override
-  public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks) {
-    return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now());
-  }
-
-  @Override
-  public AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks) {
-    return new JSONAddForeignKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fks, now());
-  }
-
-  @Override
-  public AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks) {
-    return new JSONAddUniqueConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, uks, now());
-  }
-
-  @Override
-  public AddNotNullConstraintMessage buildAddNotNullConstraintMessage(List<SQLNotNullConstraint> nns) {
-    return new JSONAddNotNullConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, nns, now());
-  }
-
-  @Override
-  public DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName,
-      String constraintName) {
-    return new JSONDropConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, dbName, tableName,
-        constraintName, now());
-  }
-
-  @Override
-  public CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog) {
-    return new JSONCreateCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), now());
-  }
-
-  @Override
-  public AlterCatalogMessage buildAlterCatalogMessage(Catalog beforeCat, Catalog afterCat) {
-    return new JSONAlterCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
-        beforeCat, afterCat, now());
-  }
-
-  @Override
-  public DropCatalogMessage buildDropCatalogMessage(Catalog catalog) {
-    return new JSONDropCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), now());
-  }
-
-  @Override
-  public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) {
-    return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now());
-  }
-
-  @Override
-  public CommitTxnMessage buildCommitTxnMessage(Long txnId) {
-    return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
-  }
-
-  @Override
-  public AbortTxnMessage buildAbortTxnMessage(Long txnId) {
-    return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
-  }
-
-  @Override
-  public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,
-                                                      String dbName, String tableName) {
-    return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, dbName, tableName, now());
-  }
-
-  @Override
-  public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files) {
-    return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, files);
-  }
-
-  private long now() {
-    return System.currentTimeMillis() / 1000;
-  }
-
-  static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
-    Map<String, String> partitionKeys = new LinkedHashMap<>();
-    for (int i = 0; i < table.getPartitionKeysSize(); ++i) {
-      partitionKeys.put(table.getPartitionKeys().get(i).getName(), partition.getValues().get(i));
-    }
-    return partitionKeys;
-  }
-
-  static List<Map<String, String>> getPartitionKeyValues(final Table table,
-      Iterator<Partition> iterator) {
-    return Lists.newArrayList(Iterators.transform(iterator,
-        new com.google.common.base.Function<Partition, Map<String, String>>() {
-          @Override
-          public Map<String, String> apply(@Nullable Partition partition) {
-            return getPartitionKeyValues(table, partition);
-          }
-        }));
-  }
-
-  static String createPrimaryKeyObjJson(SQLPrimaryKey primaryKeyObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(primaryKeyObj, "UTF-8");
-  }
-
-  static String createForeignKeyObjJson(SQLForeignKey foreignKeyObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(foreignKeyObj, "UTF-8");
-  }
-
-  static String createUniqueConstraintObjJson(SQLUniqueConstraint uniqueConstraintObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(uniqueConstraintObj, "UTF-8");
-  }
-
-  static String createNotNullConstraintObjJson(SQLNotNullConstraint notNullConstaintObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(notNullConstaintObj, "UTF-8");
-  }
-
-  static String createDatabaseObjJson(Database dbObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(dbObj, "UTF-8");
-  }
-
-  static String createCatalogObjJson(Catalog catObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(catObj, "UTF-8");
-  }
-
-  static String createTableObjJson(Table tableObj) throws TException {
-    //Note: The parameters of the Table object will be removed in the filter if it matches
-    // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
-    filterMapkeys(tableObj.getParameters(), paramsFilter);
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(tableObj, "UTF-8");
-  }
-
-  static String createPartitionObjJson(Partition partitionObj) throws TException {
-    //Note: The parameters of the Partition object will be removed in the filter if it matches
-    // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
-    filterMapkeys(partitionObj.getParameters(), paramsFilter);
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(partitionObj, "UTF-8");
-  }
-
-  static String createFunctionObjJson(Function functionObj) throws TException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    return serializer.toString(functionObj, "UTF-8");
-  }
-
-  public static ObjectNode getJsonTree(NotificationEvent event) throws Exception {
-    return getJsonTree(event.getMessage());
-  }
-
-  public static ObjectNode getJsonTree(String eventMessage) throws Exception {
-    JsonParser jsonParser = (new JsonFactory()).createJsonParser(eventMessage);
-    ObjectMapper mapper = new ObjectMapper();
-    return mapper.readValue(jsonParser, ObjectNode.class);
-  }
-
-  public static Table getTableObj(ObjectNode jsonTree) throws Exception {
-    TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
-    Table tableObj = new Table();
-    String tableJson = jsonTree.get("tableObjJson").asText();
-    deSerializer.deserialize(tableObj, tableJson, "UTF-8");
-    return tableObj;
-  }
-
-  /*
-   * TODO: Some thoughts here : We have a current todo to move some of these methods over to
-   * MessageFactory instead of being here, so we can override them, but before we move them over,
-   * we should keep the following in mind:
-   *
-   * a) We should return Iterables, not Lists. That makes sure that we can be memory-safe when
-   * implementing it rather than forcing ourselves down a path wherein returning List is part of
-   * our interface, and then people use .size() or somesuch which makes us need to materialize
-   * the entire list and not change. Also, returning Iterables allows us to do things like
-   * Iterables.transform for some of these.
-   * b) We should not have "magic" names like "tableObjJson", because that breaks expectation of a
-   * couple of things - firstly, that of serialization format, although that is fine for this
-   * JSONMessageFactory, and secondly, that makes us just have a number of mappings, one for each
-   * obj type, and sometimes, as the case is with alter, have multiples. Also, any event-specific
-   * item belongs in that event message / event itself, as opposed to in the factory. It's okay to
-   * have utility accessor methods here that are used by each of the messages to provide accessors.
-   * I'm adding a couple of those here.
-   *
-   */
-
-  public static TBase getTObj(String tSerialized, Class<? extends TBase> objClass) throws Exception{
-    TDeserializer thriftDeSerializer = new TDeserializer(new TJSONProtocol.Factory());
-    TBase obj = objClass.newInstance();
-    thriftDeSerializer.deserialize(obj, tSerialized, "UTF-8");
-    return obj;
-  }
-
-  public static Iterable<? extends TBase> getTObjs(
-      Iterable<String> objRefStrs, final Class<? extends TBase> objClass) throws Exception {
-
-    try {
-      return Iterables.transform(objRefStrs, new com.google.common.base.Function<String,TBase>(){
-        @Override
-        public TBase apply(@Nullable String objStr){
-          try {
-            return getTObj(objStr, objClass);
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-    } catch (RuntimeException re){
-      // We have to add this bit of exception handling here, because Function.apply does not allow us to throw
-      // the actual exception that might be a checked exception, so we wind up needing to throw a RuntimeException
-      // with the previously thrown exception as its cause. However, since RuntimeException.getCause() returns
-      // a throwable instead of an Exception, we have to account for the possibility that the underlying code
-      // might have thrown a Throwable that we wrapped instead, in which case, continuing to throw the
-      // RuntimeException is the best thing we can do.
-      Throwable t = re.getCause();
-      if (t instanceof Exception){
-        throw (Exception) t;
-      } else {
-        throw re;
-      }
-    }
-  }
-
-  // If we do not need this format of accessor using ObjectNode, this is a candidate for removal as well
-  public static Iterable<? extends TBase> getTObjs(
-      ObjectNode jsonTree, String objRefListName, final Class<? extends TBase> objClass) throws Exception {
-    Iterable<JsonNode> jsonArrayIterator = jsonTree.get(objRefListName);
-    com.google.common.base.Function<JsonNode,String> textExtractor =
-        new com.google.common.base.Function<JsonNode, String>() {
-      @Nullable
-      @Override
-      public String apply(@Nullable JsonNode input) {
-        return input.asText();
-      }
-    };
-    return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
new file mode 100644
index 0000000..9c64e33
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
@@ -0,0 +1,181 @@
+package org.apache.hadoop.hive.metastore.messaging.json.gzip;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.zip.GZIPInputStream;
+
+
+public class DeSerializer extends JSONMessageDeserializer {
+  private static final Logger LOG = LoggerFactory.getLogger(Serializer.class.getName());
+
+  private static String deCompress(String messageBody) {
+    byte[] decodedBytes = Base64.getDecoder().decode(messageBody.getBytes(StandardCharsets.UTF_8));
+    try (
+        ByteArrayInputStream in = new ByteArrayInputStream(decodedBytes);
+        GZIPInputStream is = new GZIPInputStream(in)
+    ) {
+      byte[] bytes = IOUtils.toByteArray(is);
+      return new String(bytes, StandardCharsets.UTF_8);
+    } catch (IOException e) {
+      LOG.error("cannot decode the stream", e);
+      LOG.debug("base64 encoded String", messageBody);
+      throw new RuntimeException("cannot decode the stream ", e);
+    }
+  }
+
+  /**
+   * this is mainly as a utility to allow debugging of messages for developers by providing the
+   * message in a file and getting an actual message out.
+   * This class on a deployed hive instance will also be bundled in hive-exec jar.
+   *
+   */
+  public static void main(String[] args) throws IOException {
+    if(args.length != 1) {
+      System.out.println("Usage:");
+      System.out.println("java -cp [classpath] "+DeSerializer.class.getCanonicalName() +" [file_location]");
+    }
+    System.out.print(
+        deCompress(FileUtils.readFileToString(new File(args[0]), StandardCharsets.UTF_8)));
+  }
+
+  @Override
+  public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) {
+    return super.getCreateDatabaseMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AlterDatabaseMessage getAlterDatabaseMessage(String messageBody) {
+    return super.getAlterDatabaseMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public DropDatabaseMessage getDropDatabaseMessage(String messageBody) {
+    return super.getDropDatabaseMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public CreateTableMessage getCreateTableMessage(String messageBody) {
+    return super.getCreateTableMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AlterTableMessage getAlterTableMessage(String messageBody) {
+    return super.getAlterTableMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public DropTableMessage getDropTableMessage(String messageBody) {
+    return super.getDropTableMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AddPartitionMessage getAddPartitionMessage(String messageBody) {
+    return super.getAddPartitionMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AlterPartitionMessage getAlterPartitionMessage(String messageBody) {
+    return super.getAlterPartitionMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public DropPartitionMessage getDropPartitionMessage(String messageBody) {
+    return super.getDropPartitionMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public CreateFunctionMessage getCreateFunctionMessage(String messageBody) {
+    return super.getCreateFunctionMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public DropFunctionMessage getDropFunctionMessage(String messageBody) {
+    return super.getDropFunctionMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public InsertMessage getInsertMessage(String messageBody) {
+    return super.getInsertMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AddPrimaryKeyMessage getAddPrimaryKeyMessage(String messageBody) {
+    return super.getAddPrimaryKeyMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AddForeignKeyMessage getAddForeignKeyMessage(String messageBody) {
+    return super.getAddForeignKeyMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AddUniqueConstraintMessage getAddUniqueConstraintMessage(String messageBody) {
+    return super.getAddUniqueConstraintMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AddNotNullConstraintMessage getAddNotNullConstraintMessage(String messageBody) {
+    return super.getAddNotNullConstraintMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public DropConstraintMessage getDropConstraintMessage(String messageBody) {
+    return super.getDropConstraintMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public OpenTxnMessage getOpenTxnMessage(String messageBody) {
+    return super.getOpenTxnMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public CommitTxnMessage getCommitTxnMessage(String messageBody) {
+    return super.getCommitTxnMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AbortTxnMessage getAbortTxnMessage(String messageBody) {
+    return super.getAbortTxnMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AllocWriteIdMessage getAllocWriteIdMessage(String messageBody) {
+    return super.getAllocWriteIdMessage(deCompress(messageBody));
+  }
+
+  @Override
+  public AcidWriteMessage getAcidWriteMessage(String messageBody) {
+    return super.getAcidWriteMessage(deCompress(messageBody));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java
new file mode 100644
index 0000000..07b01a9
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging.json.gzip;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
+
+/**
+ * This implementation gzips and then Base64 encodes the message before writing it out.
+ * This MessageEncoder will break the backward compatibility for hive replication v1 which uses webhcat endpoints.
+ */
+public class GzipJSONMessageEncoder implements MessageEncoder {
+  public static final String FORMAT = "gzip(json-2.0)";
+
+  static {
+    MessageFactory.register(FORMAT, GzipJSONMessageEncoder.class);
+  }
+
+  private static DeSerializer deSerializer = new DeSerializer();
+  private static Serializer serializer = new Serializer();
+
+  private static volatile MessageEncoder instance;
+
+  public static MessageEncoder getInstance() {
+    if (instance == null) {
+      synchronized (GzipJSONMessageEncoder.class) {
+        if (instance == null) {
+          instance = new GzipJSONMessageEncoder();
+        }
+      }
+    }
+    return instance;
+  }
+
+  @Override
+  public MessageDeserializer getDeserializer() {
+    return deSerializer;
+  }
+
+  @Override
+  public MessageSerializer getSerializer() {
+    return serializer;
+  }
+
+  @Override
+  public String getMessageFormat() {
+    return FORMAT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
new file mode 100644
index 0000000..c40600d
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.metastore.messaging.json.gzip;
+
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.zip.GZIPOutputStream;
+
+class Serializer implements MessageSerializer {
+  private static final Logger LOG = LoggerFactory.getLogger(Serializer.class.getName());
+
+  @Override
+  public String serialize(EventMessage message) {
+    String messageAsString = MessageSerializer.super.serialize(message);
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      GZIPOutputStream gout = new GZIPOutputStream(baos);
+      gout.write(messageAsString.getBytes(StandardCharsets.UTF_8));
+      gout.close();
+      byte[] compressed = baos.toByteArray();
+      return new String(Base64.getEncoder().encode(compressed), StandardCharsets.UTF_8);
+    } catch (IOException e) {
+      LOG.error("could not use gzip output stream", e);
+      LOG.debug("message " + messageAsString);
+      throw new RuntimeException("could not use the gzip output Stream", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
index 3d36b60..b01a632 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
@@ -23,6 +23,7 @@ import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +43,8 @@ import org.slf4j.LoggerFactory;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 
 public class MetaStoreTestUtils {
+  private static Map<Integer, Thread> map = new HashMap<>();
+
   private static final Logger LOG = LoggerFactory.getLogger(MetaStoreTestUtils.class);
   private static final String TMP_DIR = System.getProperty("test.tmp.dir");
   public static final int RETRY_COUNT = 10;
@@ -75,9 +78,17 @@ public class MetaStoreTestUtils {
     }, "MetaStoreThread-" + port);
     thread.setDaemon(true);
     thread.start();
+    map.put(port,thread);
     MetaStoreTestUtils.loopUntilHMSReady(port);
   }
 
+  public static void close(final int port){
+    Thread thread = map.get(port);
+    if(thread != null){
+      thread.stop();
+    }
+  }
+
   public static int startMetaStoreWithRetry(final HadoopThriftAuthBridge bridge) throws Exception {
     return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, MetastoreConf.newMetastoreConf());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/testutils/ptest2/conf/deployed/master-mr2.properties
----------------------------------------------------------------------
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties
index 23ad0f6..ad5405f 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -68,7 +68,7 @@ ut.service.batchSize=8
 
 unitTests.module.itests.hive-unit=itests.hive-unit
 ut.itests.hive-unit.batchSize=9
-ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr
+ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat
 
 unitTests.module.itests.qtest=itests.qtest
 ut.itests.qtest.batchSize=9