You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2021/11/09 08:13:05 UTC

[hive] branch master updated: HIVE-25596: Compress Hive Replication Metrics while storing (Haymant Mangla, reviewed by Aasha Medhi, Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ec140e  HIVE-25596: Compress Hive Replication Metrics while storing (Haymant Mangla, reviewed by Aasha Medhi, Pravin Kumar Sinha)
6ec140e is described below

commit 6ec140ed2f735050dd742b0af76fb21ec6b970f5
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Tue Nov 9 13:42:46 2021 +0530

    HIVE-25596: Compress Hive Replication Metrics while storing (Haymant Mangla, reviewed by Aasha Medhi, Pravin Kumar Sinha)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   3 +-
 .../upgrade/hive/hive-schema-4.0.0.hive.sql        |  21 +++-
 .../upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql   |  21 +++-
 .../hadoop/hive/ql/exec/FunctionRegistry.java      |   1 +
 .../hadoop/hive/ql/exec/repl/ReplStatsTracker.java |  16 ++-
 .../hive/ql/parse/repl/metric/MetricSink.java      |  27 ++++-
 .../repl/metric/ReplicationMetricCollector.java    |  27 +++--
 .../parse/repl/metric/event/ReplicationMetric.java |   9 ++
 .../hive/ql/parse/repl/metric/event/Stage.java     |   9 +-
 .../hive/ql/udf/generic/GenericUDFDeserialize.java |  92 +++++++++++++++++
 .../metric/TestReplicationMetricCollector.java     |  43 ++++----
 .../repl/metric/TestReplicationMetricSink.java     |  43 +++++---
 .../TestReplicationMetricUpdateOnFailure.java      |   2 +-
 .../ql/udf/generic/TestGenericUDFDeserialize.java  |  98 ++++++++++++++++++
 .../clientpositive/replication_metrics_ingest.q    |   4 +-
 .../test/queries/clientpositive/udf_deserialize.q  |   8 ++
 .../llap/replication_metrics_ingest.q.out          |  17 +--
 .../results/clientpositive/llap/resourceplan.q.out |   4 +
 .../clientpositive/llap/show_functions.q.out       |   3 +
 .../llap/strict_managed_tables_sysdb.q.out         |   6 ++
 .../test/results/clientpositive/llap/sysdb.q.out   |  14 ++-
 .../clientpositive/llap/udf_deserialize.q.out      |  52 ++++++++++
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp    |  22 ++++
 .../src/gen/thrift/gen-cpp/hive_metastore_types.h  |  12 ++-
 .../hive/metastore/api/ReplicationMetrics.java     | 114 ++++++++++++++++++++-
 .../gen-php/metastore/ReplicationMetrics.php       |  24 +++++
 .../src/gen/thrift/gen-py/hive_metastore/ttypes.py |  14 ++-
 .../src/gen/thrift/gen-rb/hive_metastore_types.rb  |   4 +-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   4 +
 .../src/main/thrift/hive_metastore.thrift          |   3 +-
 .../apache/hadoop/hive/metastore/ObjectStore.java  |  16 ++-
 .../metastore/messaging/MessageDeserializer.java   |   7 ++
 .../hive/metastore/messaging/MessageFactory.java   |  26 ++++-
 .../metastore/messaging/MessageSerializer.java     |   3 +
 .../messaging/json/gzip/DeSerializer.java          |   4 +
 .../metastore/messaging/json/gzip/Serializer.java  |  10 +-
 .../hive/metastore/model/MReplicationMetrics.java  |  10 ++
 .../src/main/resources/package.jdo                 |   5 +-
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |   3 +-
 .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql     |  14 ++-
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |   3 +-
 .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql     |   2 +
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |   3 +-
 .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql     |   4 +
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |   3 +-
 .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql   |   2 +
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |   3 +-
 .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql   |   4 +
 .../upgrade-3.1.3000-to-4.0.0.postgres.sql         |   4 +
 49 files changed, 746 insertions(+), 97 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 65fa491..35f4b93 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -696,7 +696,8 @@ public class HiveConf extends Configuration {
         + "attempted using the snapshot based approach. If disabled, the replication will fail in case the target is "
         + "modified."),
     REPL_STATS_TOP_EVENTS_COUNTS("hive.repl.stats.events.count", 5,
-        "Number of top costliest events that needs to maintained per event type for the replication statistics."),
+        "Number of topmost expensive events that needs to be maintained per event type for the replication statistics." +
+                " Maximum permissible limit is 10."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),
diff --git a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
index c6fe1172..9de3488 100644
--- a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
@@ -1466,7 +1466,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `REPLICATION_METRICS` (
     `POLICY_NAME` string,
     `DUMP_EXECUTION_ID` bigint,
     `METADATA` string,
-    `PROGRESS` string
+    `PROGRESS` string,
+    `MESSAGE_FORMAT` varchar(16)
 )
 STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
 TBLPROPERTIES (
@@ -1477,10 +1478,26 @@ TBLPROPERTIES (
     \"RM_POLICY\",
     \"RM_DUMP_EXECUTION_ID\",
     \"RM_METADATA\",
-    \"RM_PROGRESS\"
+    \"RM_PROGRESS\",
+    \"MESSAGE_FORMAT\"
 FROM \"REPLICATION_METRICS\""
 );
 
+CREATE OR REPLACE VIEW `REPLICATION_METRICS_VIEW` (
+    `SCHEDULED_EXECUTION_ID`,
+    `POLICY_NAME`,
+    `DUMP_EXECUTION_ID`,
+    `METADATA`,
+    `PROGRESS`
+) AS
+SELECT DISTINCT
+    RM.`SCHEDULED_EXECUTION_ID`,
+    RM.`POLICY_NAME`,
+    RM.`DUMP_EXECUTION_ID`,
+    RM.`METADATA`,
+    deserialize(RM.`PROGRESS`, RM.`MESSAGE_FORMAT`)
+FROM SYS.`REPLICATION_METRICS` AS RM;
+
 CREATE EXTERNAL TABLE IF NOT EXISTS `NOTIFICATION_LOG` (
   `NL_ID` bigint,
   `EVENT_ID` bigint,
diff --git a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
index 6ee90d8..6e3a9e1 100644
--- a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
@@ -527,7 +527,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `REPLICATION_METRICS` (
     `POLICY_NAME` string,
     `DUMP_EXECUTION_ID` bigint,
     `METADATA` string,
-    `PROGRESS` string
+    `PROGRESS` string,
+    `MESSAGE_FORMAT` varchar(16)
 )
 STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
 TBLPROPERTIES (
@@ -538,10 +539,26 @@ TBLPROPERTIES (
     \"RM_POLICY\",
     \"RM_DUMP_EXECUTION_ID\",
     \"RM_METADATA\",
-    \"RM_PROGRESS\"
+    \"RM_PROGRESS\",
+    \"MESSAGE_FORMAT\"
 FROM \"REPLICATION_METRICS\""
 );
 
+CREATE OR REPLACE VIEW `REPLICATION_METRICS_VIEW` (
+    `SCHEDULED_EXECUTION_ID`,
+    `POLICY_NAME`,
+    `DUMP_EXECUTION_ID`,
+    `METADATA`,
+    `PROGRESS`
+) AS
+SELECT DISTINCT
+    RM.`SCHEDULED_EXECUTION_ID`,
+    RM.`POLICY_NAME`,
+    RM.`DUMP_EXECUTION_ID`,
+    RM.`METADATA`,
+    deserialize(RM.`PROGRESS`, RM.`MESSAGE_FORMAT`)
+FROM SYS.`REPLICATION_METRICS` AS RM;
+
 CREATE EXTERNAL TABLE IF NOT EXISTS `NOTIFICATION_LOG` (
   `NL_ID` bigint,
   `EVENT_ID` bigint,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 75fc858..1c77496 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -510,6 +510,7 @@ public final class FunctionRegistry {
     system.registerGenericUDF("sort_array", GenericUDFSortArray.class);
     system.registerGenericUDF("sort_array_by", GenericUDFSortArrayByField.class);
     system.registerGenericUDF("array_contains", GenericUDFArrayContains.class);
+    system.registerGenericUDF("deserialize", GenericUDFDeserialize.class);
     system.registerGenericUDF("sentences", GenericUDFSentences.class);
     system.registerGenericUDF("map_keys", GenericUDFMapKeys.class);
     system.registerGenericUDF("map_values", GenericUDFMapValues.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java
index 0d9683b..06c147c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java
@@ -22,7 +22,6 @@ import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.ObjectName;
 import java.math.RoundingMode;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
@@ -30,13 +29,17 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_STATS_TOP_EVENTS_COUNTS;
+
 /**
  * Tracks the replication statistics per event type.
  */
 public class ReplStatsTracker {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ReplStatsTracker.class);
   // Maintains the length of the RM_Progress column in the RDBMS, which stores the ReplStats
-  public static int RM_PROGRESS_LENGTH = 24000;
+  public static int RM_PROGRESS_LENGTH = 10000;
+  public static int TOP_K_MAX = 10;
 
   // Maintains the descriptive statistics per event type.
   private ConcurrentHashMap<String, DescriptiveStatistics> descMap;
@@ -49,6 +52,11 @@ public class ReplStatsTracker {
   private String lastEventId;
 
   public ReplStatsTracker(int k) {
+    if (k > TOP_K_MAX) {
+      LOG.warn("Value for {} exceeded maximum permissible limit. Using Maximum of {}", REPL_STATS_TOP_EVENTS_COUNTS,
+              TOP_K_MAX);
+      k = TOP_K_MAX;
+    }
     this.k = k;
     descMap = new ConcurrentHashMap<>();
     topKEvents = new ConcurrentHashMap<>();
@@ -122,6 +130,10 @@ public class ReplStatsTracker {
     return lastEventId;
   }
 
+  public int getK() {
+    return k;
+  }
+
   private String formatDouble(DecimalFormat dFormat, Double d) {
     if (!d.isNaN()) {
       return dFormat.format(d);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
index d0f8b5f..3bbabb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
@@ -22,14 +22,17 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.ReplicationMetricList;
 import org.apache.hadoop.hive.metastore.api.ReplicationMetrics;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.utils.Retry;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker;
 import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -48,6 +51,7 @@ public final class MetricSink {
   private static volatile MetricSink instance;
   private boolean isInitialised = false;
   private HiveConf conf;
+  private static String RM_PROGRESS_COLUMN_WIDTH_EXCEEDS_MSG = "ERROR: RM_PROGRESS LIMIT EXCEEDED.";
 
   private MetricSink() {
     this.executorService = Executors.newSingleThreadScheduledExecutor();
@@ -103,6 +107,21 @@ public final class MetricSink {
       this.conf = conf;
     }
 
+    private String updateRMProgressIfLimitExceeds(Progress progress, MessageEncoder encoder) throws SemanticException {
+      try {
+        String progressJson = new ObjectMapper().writeValueAsString(progress);
+        String serializedProgress = encoder.getSerializer().serialize(progressJson);
+        if (serializedProgress.length() > ReplStatsTracker.RM_PROGRESS_LENGTH) {
+          LOG.warn("Error: RM_PROGRESS limit exceeded.\n" +
+                  "RM_PROGRESS: " + progressJson + " overwritten by " + RM_PROGRESS_COLUMN_WIDTH_EXCEEDS_MSG);
+          serializedProgress = encoder.getSerializer().serialize(RM_PROGRESS_COLUMN_WIDTH_EXCEEDS_MSG);
+        }
+        return serializedProgress;
+      } catch (Exception e) {
+        throw new SemanticException(e);
+      }
+    }
+
     @Override
     public void run() {
       ReplicationMetricList metricList = new ReplicationMetricList();
@@ -116,14 +135,16 @@ public final class MetricSink {
           int totalMetricsSize = metrics.size();
           List<ReplicationMetrics> replicationMetricsList = new ArrayList<>(totalMetricsSize);
           ObjectMapper mapper = new ObjectMapper();
+          MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(conf);
           for (int index = 0; index < totalMetricsSize; index++) {
             ReplicationMetric metric = metrics.removeFirst();
             ReplicationMetrics persistentMetric = new ReplicationMetrics();
             persistentMetric.setDumpExecutionId(metric.getDumpExecutionId());
             persistentMetric.setScheduledExecutionId(metric.getScheduledExecutionId());
             persistentMetric.setPolicy(metric.getPolicy());
-            persistentMetric.setProgress(mapper.writeValueAsString(metric.getProgress()));
+            persistentMetric.setProgress(updateRMProgressIfLimitExceeds(metric.getProgress(), encoder));
             persistentMetric.setMetadata(mapper.writeValueAsString(metric.getMetadata()));
+            persistentMetric.setMessageFormat(encoder.getMessageFormat());
             LOG.debug("Metric to be persisted {} ", persistentMetric);
             replicationMetricsList.add(persistentMetric);
           }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index 88f8e74..27d9f7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.metric;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,6 +50,7 @@ public abstract class ReplicationMetricCollector {
   private MetricCollector metricCollector;
   private boolean isEnabled;
   private static boolean enableForTests;
+  private HiveConf conf;
 
   public void setMetricsMBean(ObjectName metricsMBean) {
     this.metricsMBean = metricsMBean;
@@ -58,6 +60,7 @@ public abstract class ReplicationMetricCollector {
 
   public ReplicationMetricCollector(String dbName, Metadata.ReplicationType replicationType,
                              String stagingDir, long dumpExecutionId, HiveConf conf) {
+    this.conf = conf;
     checkEnabledForTests(conf);
     String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME);
     long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
@@ -65,7 +68,7 @@ public abstract class ReplicationMetricCollector {
       isEnabled = true;
       metricCollector = MetricCollector.getInstance().init(conf);
       MetricSink.getInstance().init(conf);
-      Metadata metadata = new Metadata(dbName, replicationType, stagingDir);
+      Metadata metadata = new Metadata(dbName, replicationType, getStagingDir(stagingDir));
       replicationMetric = new ReplicationMetric(executionId, policy, dumpExecutionId, metadata);
     }
   }
@@ -75,7 +78,7 @@ public abstract class ReplicationMetricCollector {
       LOG.debug("Stage Started {}, {}, {}", stageName, metricMap.size(), metricMap );
       Progress progress = replicationMetric.getProgress();
       progress.setStatus(Status.IN_PROGRESS);
-      Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis());
+      Stage stage = new Stage(stageName, Status.IN_PROGRESS, getCurrentTimeInMillis());
       for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
         stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
       }
@@ -91,7 +94,7 @@ public abstract class ReplicationMetricCollector {
       LOG.info("Failover Stage Started {}, {}, {}", stageName, metricMap.size(), metricMap);
       Progress progress = replicationMetric.getProgress();
       progress.setStatus(Status.FAILOVER_IN_PROGRESS);
-      Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis());
+      Stage stage = new Stage(stageName, Status.IN_PROGRESS, getCurrentTimeInMillis());
       for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
         stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
       }
@@ -116,7 +119,7 @@ public abstract class ReplicationMetricCollector {
         stage = new Stage(stageName, status, -1L);
       }
       stage.setStatus(status);
-      stage.setEndTime(System.currentTimeMillis());
+      stage.setEndTime(getCurrentTimeInMillis());
       stage.setReplSnapshotsCount(replSnapshotCount);
       if (replStatsTracker != null && !(replStatsTracker instanceof NoOpReplStatsTracker)) {
         String replStatString = replStatsTracker.toString();
@@ -145,7 +148,7 @@ public abstract class ReplicationMetricCollector {
         stage = new Stage(stageName, status, -1L);
       }
       stage.setStatus(status);
-      stage.setEndTime(System.currentTimeMillis());
+      stage.setEndTime(getCurrentTimeInMillis());
       stage.setErrorLogPath(errorLogPath);
       progress.addStage(stage);
       replicationMetric.setProgress(progress);
@@ -166,7 +169,7 @@ public abstract class ReplicationMetricCollector {
         stage = new Stage(stageName, status, -1L);
       }
       stage.setStatus(status);
-      stage.setEndTime(System.currentTimeMillis());
+      stage.setEndTime(getCurrentTimeInMillis());
       progress.addStage(stage);
       replicationMetric.setProgress(progress);
       metricCollector.addMetric(replicationMetric);
@@ -224,4 +227,16 @@ public abstract class ReplicationMetricCollector {
       }
     }
   }
+
+  private boolean testingModeEnabled() {
+    return conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL);
+  }
+
+  private long getCurrentTimeInMillis() {
+    return testingModeEnabled() ? 0L : System.currentTimeMillis();
+  }
+
+  private String getStagingDir(String stagingDir) {
+    return testingModeEnabled() ? "dummyDir" : stagingDir;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java
index 230b516..ca6bddf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java
@@ -26,6 +26,7 @@ public class ReplicationMetric {
   private long dumpExecutionId;
   private Metadata metadata;
   private Progress progress;
+  private String messageFormat;
 
   public ReplicationMetric(long scheduledExecutionId, String policy, long dumpExecutionId, Metadata metadata){
     this.scheduledExecutionId = scheduledExecutionId;
@@ -71,4 +72,12 @@ public class ReplicationMetric {
   public void setProgress(Progress progress) {
     this.progress = progress;
   }
+
+  public String getMessageFormat() {
+    return messageFormat;
+  }
+
+  public void setMessageFormat(String messageFormat) {
+    this.messageFormat = messageFormat;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
index 4a54a8b..83df9f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
@@ -24,8 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker.RM_PROGRESS_LENGTH;
-
 /**
  * Class for defining the different stages of replication.
  */
@@ -129,12 +127,7 @@ public class Stage {
   }
 
   public void setReplStats(String replStats) {
-    // Check the stat string doesn't surpass the RM_PROGRESS column length.
-    if (replStats.length() >= RM_PROGRESS_LENGTH - 2000) {
-      this.replStats = "RM_PROGRESS LIMIT EXCEEDED TO " + replStats.length();
-    } else {
-      this.replStats = replStats;
-    }
+    this.replStats = replStats;
   }
 
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java
new file mode 100644
index 0000000..9fa375a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+/**
+ * GenericUDFDeserializeString.
+ *
+ */
+@Description(name = "deserialize",
+        value="_FUNC_(base64 encoded message, compressionFormat) - Returns plain text string of given message which " +
+                "was compressed in compressionFormat and base64 encoded.",
+        extended="Currently, Supports only 'gzip' for Gzip compressed and base 64 encoded strings.\n" +
+                "Example:\n"
+                + "  > SELECT _FUNC_('H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA', 'gzip') FROM src LIMIT 1;\n"
+                + "  test")
+public class GenericUDFDeserialize extends GenericUDF {
+
+    private static final int ARG_COUNT = 2; // Number of arguments to this UDF
+    private static final String FUNC_NAME = "deserialize"; // External Name
+
+    private transient PrimitiveObjectInspector stringOI = null;
+    private transient PrimitiveObjectInspector compressionFormat = null;
+
+    @Override
+    public ObjectInspector initialize(ObjectInspector[] arguments)
+            throws UDFArgumentException {
+        if (arguments.length != ARG_COUNT) {
+            throw new UDFArgumentException("The function " + FUNC_NAME + " accepts " + ARG_COUNT + " arguments.");
+        }
+        for (ObjectInspector arg: arguments) {
+            if (arg.getCategory() != ObjectInspector.Category.PRIMITIVE ||
+                    PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP != PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
+                            ((PrimitiveObjectInspector)arg).getPrimitiveCategory())){
+                throw new UDFArgumentTypeException(0, "The arguments to " + FUNC_NAME + " must be a string/varchar");
+            }
+        }
+        stringOI = (PrimitiveObjectInspector) arguments[0];
+        compressionFormat = (PrimitiveObjectInspector) arguments[1];
+        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+    }
+
+    @Override
+    public Object evaluate(DeferredObject[] arguments) throws HiveException {
+        String value = PrimitiveObjectInspectorUtils.getString(arguments[0].get(), stringOI);
+        String compressionFormat = PrimitiveObjectInspectorUtils.getString(arguments[1].get(), this.compressionFormat);
+        if (value == null || StringUtils.isEmpty(compressionFormat)) {
+            return value;
+        }
+        MessageEncoder encoder;
+        try {
+            encoder = MessageFactory.getInstance(compressionFormat);
+        } catch (Exception e) {
+            throw new HiveException(e);
+        }
+        return encoder.getDeserializer().deSerializeGenericString(value);
+    }
+
+    @Override
+    public String getDisplayString(String[] children) {
+        assert (children.length == ARG_COUNT);
+        return getStandardDisplayString(FUNC_NAME, children, ",");
+    }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
index a9784a0..45569bc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.parse.repl.metric;
 
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
@@ -72,7 +74,7 @@ public class TestReplicationMetricCollector {
     conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "1");
     MetricCollector.getInstance().init(conf);
     Mockito.when(fmd.getFailoverEventId()).thenReturn(10L);
-    Mockito.when(fmd.getFilePath()).thenReturn("staging");
+    Mockito.when(fmd.getFilePath()).thenReturn("dummyDir");
   }
 
   @After
@@ -105,7 +107,7 @@ public class TestReplicationMetricCollector {
     conf = new HiveConf();
     MetricCollector.getInstance().init(conf);
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-        "staging", conf);
+        "dummyDir", conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -120,7 +122,7 @@ public class TestReplicationMetricCollector {
     conf = new HiveConf();
     MetricCollector.getInstance().init(conf);
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-        "staging", conf);
+        "dummyDir", conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -132,7 +134,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessBootstrapDumpMetrics() throws Exception {
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-        "staging", conf);
+        "dummyDir", conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -152,7 +154,7 @@ public class TestReplicationMetricCollector {
     actualMetrics = MetricCollector.getInstance().getMetrics();
     Assert.assertEquals(1, actualMetrics.size());
 
-    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging");
+    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "dummyDir");
     expectedMetadata.setLastReplId(10);
     Progress expectedProgress = new Progress();
     expectedProgress.setStatus(Status.SUCCESS);
@@ -174,7 +176,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessIncrDumpMetrics() throws Exception {
     ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
-        "staging", conf);
+        "dummyDir", conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -194,7 +196,7 @@ public class TestReplicationMetricCollector {
     actualMetrics = MetricCollector.getInstance().getMetrics();
     Assert.assertEquals(1, actualMetrics.size());
 
-    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging");
+    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "dummyDir");
     expectedMetadata.setLastReplId(10);
     Progress expectedProgress = new Progress();
     expectedProgress.setStatus(Status.SUCCESS);
@@ -217,7 +219,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testFailoverReadyDumpMetrics() throws Exception {
     ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
-            "staging", conf);
+            "dummyDir", conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
     incrDumpMetricCollector.reportFailoverStart("dump", metricMap, fmd);
@@ -231,10 +233,10 @@ public class TestReplicationMetricCollector {
     actualMetrics = MetricCollector.getInstance().getMetrics();
     Assert.assertEquals(1, actualMetrics.size());
 
-    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging");
+    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "dummyDir");
     expectedMetadata.setLastReplId(10);
     expectedMetadata.setFailoverEventId(10);
-    expectedMetadata.setFailoverMetadataLoc("staging");
+    expectedMetadata.setFailoverMetadataLoc("dummyDir");
     Progress expectedProgress = new Progress();
     expectedProgress.setStatus(Status.FAILOVER_READY);
     Stage dumpStage = new Stage("dump", Status.SUCCESS, 0);
@@ -253,7 +255,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessBootstrapLoadMetrics() throws Exception {
     ReplicationMetricCollector bootstrapLoadMetricCollector = new BootstrapLoadMetricCollector("db",
-        "staging", 1, conf);
+        "dummyDir", 1, conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -274,7 +276,7 @@ public class TestReplicationMetricCollector {
     actualMetrics = MetricCollector.getInstance().getMetrics();
     Assert.assertEquals(1, actualMetrics.size());
 
-    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging");
+    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "dummyDir");
     expectedMetadata.setLastReplId(10);
     Progress expectedProgress = new Progress();
     expectedProgress.setStatus(Status.SUCCESS);
@@ -297,7 +299,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessIncrLoadMetrics() throws Exception {
     ReplicationMetricCollector incrLoadMetricCollector = new IncrementalLoadMetricCollector("db",
-        "staging", 1, conf);
+        "dummyDir", 1, conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -317,7 +319,7 @@ public class TestReplicationMetricCollector {
     actualMetrics = MetricCollector.getInstance().getMetrics();
     Assert.assertEquals(1, actualMetrics.size());
 
-    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging");
+    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "dummyDir");
     expectedMetadata.setLastReplId(10);
     Progress expectedProgress = new Progress();
     expectedProgress.setStatus(Status.SUCCESS);
@@ -361,7 +363,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessStageFailure() throws Exception {
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-      "staging", conf);
+      "dummyDir", conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -376,7 +378,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessStageFailedAdmin() throws Exception {
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-      "staging", conf);
+      "dummyDir", conf);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -462,14 +464,19 @@ public class TestReplicationMetricCollector {
 
   @Test
   public void testReplStatsTrackerLimit() {
+    MessageSerializer serializer = MessageFactory.getDefaultInstanceForReplMetrics(conf).getSerializer();
     ReplStatsTracker repl = new ReplStatsTracker(10);
     // Check for k=10
     generateStatsString(10, repl);
-    assertTrue("ReplStat string is " + repl.toString().length(), repl.toString().length() < 24000);
+    String replStatsTracker = repl.toString();
+    String gzipSerialized = serializer.serialize(replStatsTracker);
+    assertTrue("ReplStat string is " + gzipSerialized.length(), gzipSerialized.length() < ReplStatsTracker.RM_PROGRESS_LENGTH);
     // Check for k=5
     repl = new ReplStatsTracker(5);
     generateStatsString(5, repl);
-    assertTrue("ReplStat string is " + repl.toString().length(), repl.toString().length() < 24000);
+    replStatsTracker = repl.toString();
+    gzipSerialized = serializer.serialize(replStatsTracker);
+    assertTrue("ReplStat string is " + gzipSerialized.length(), gzipSerialized.length() < ReplStatsTracker.RM_PROGRESS_LENGTH);
     // Check for k=2 & check NaN values doesn't get messed up due to formatter
     repl = new ReplStatsTracker(2);
     generateStatsString(2, repl);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
index dc7459d..8c359b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.GetReplicationMetricsRequest;
 import org.apache.hadoop.hive.metastore.api.ReplicationMetricList;
 import org.apache.hadoop.hive.metastore.api.ReplicationMetrics;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
@@ -56,7 +58,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker.RM_PROGRESS_LENGTH;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -65,6 +66,8 @@ import static org.junit.Assert.assertTrue;
 @RunWith(MockitoJUnitRunner.class)
 public class TestReplicationMetricSink {
 
+  //Deserializer to decode and decompress the input string.
+  MessageDeserializer deserializer;
   HiveConf conf;
 
   @Mock
@@ -78,6 +81,11 @@ public class TestReplicationMetricSink {
     MetricSink metricSinkSpy = Mockito.spy(MetricSink.getInstance());
     Mockito.doReturn(1L).when(metricSinkSpy).getFrequencyInSecs();
     metricSinkSpy.init(conf);
+    deserializer = MessageFactory.getDefaultInstanceForReplMetrics(conf).getDeserializer();
+  }
+
+  private String deSerialize(String msg) {
+    return deserializer.deSerializeGenericString(msg);
   }
 
   @Test
@@ -99,9 +107,7 @@ public class TestReplicationMetricSink {
     bootstrapDumpMetricCollector.reportEnd(Status.SUCCESS);
 
     Metadata expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
-      Metadata.ReplicationType.BOOTSTRAP, "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_"
-      + "parse_TestReplicationScenarios_245261428230295/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGlu"
-      + "Y3JfMTU5MjIwNTg3NTM4Nw==/0/hive");
+      Metadata.ReplicationType.BOOTSTRAP, "dummyDir");
     expectedMetadata.setLastReplId(10);
     Progress expectedProgress = new Progress();
     expectedProgress.setStatus(Status.SUCCESS);
@@ -126,7 +132,8 @@ public class TestReplicationMetricSink {
     ReplicationMetric actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
         actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
         mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
-    ProgressMapper progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+    actualMetric.setMessageFormat(actualThriftMetric.getMessageFormat());
+    ProgressMapper progressMapper = mapper.readValue(deSerialize(actualThriftMetric.getProgress()), ProgressMapper.class);
     Progress progress = new Progress();
     progress.setStatus(progressMapper.getStatus());
     for (StageMapper stageMapper : progressMapper.getStages()) {
@@ -159,9 +166,7 @@ public class TestReplicationMetricSink {
     incrementDumpMetricCollector.reportEnd(Status.SUCCESS);
 
     expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
-      Metadata.ReplicationType.INCREMENTAL, "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_"
-      + "parse_TestReplicationScenarios_245261428230295/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGlu"
-      + "Y3JfMTU5MjIwNTg3NTM4Nw==/0/hive");
+      Metadata.ReplicationType.INCREMENTAL, "dummyDir");
     expectedMetadata.setLastReplId(10);
     expectedProgress = new Progress();
     expectedProgress.setStatus(Status.SUCCESS);
@@ -184,7 +189,8 @@ public class TestReplicationMetricSink {
     actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
       actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
       mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
-    progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+    actualMetric.setMessageFormat(actualThriftMetric.getMessageFormat());
+    progressMapper = mapper.readValue(deSerialize(actualThriftMetric.getProgress()), ProgressMapper.class);
     progress = new Progress();
     progress.setStatus(progressMapper.getStatus());
     for (StageMapper stageMapper : progressMapper.getStages()) {
@@ -220,7 +226,7 @@ public class TestReplicationMetricSink {
     failoverDumpMetricCollector.reportEnd(Status.FAILOVER_READY);
 
     expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
-            Metadata.ReplicationType.INCREMENTAL, stagingDir);
+            Metadata.ReplicationType.INCREMENTAL, "dummyDir");
     expectedMetadata.setLastReplId(10);
     expectedMetadata.setFailoverEventId(100);
     expectedMetadata.setFailoverMetadataLoc(stagingDir + FailoverMetaData.FAILOVER_METADATA);
@@ -245,7 +251,8 @@ public class TestReplicationMetricSink {
     actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
             actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
             mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
-    progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+    actualMetric.setMessageFormat(actualThriftMetric.getMessageFormat());
+    progressMapper = mapper.readValue(deSerialize(actualThriftMetric.getProgress()), ProgressMapper.class);
     progress = new Progress();
     progress.setStatus(progressMapper.getStatus());
     for (StageMapper stageMapper : progressMapper.getStages()) {
@@ -308,6 +315,8 @@ public class TestReplicationMetricSink {
 
   @Test
   public void testReplStatsInMetrics() throws HiveException, InterruptedException, TException {
+    int origRMProgress = ReplStatsTracker.RM_PROGRESS_LENGTH;
+    ReplStatsTracker.RM_PROGRESS_LENGTH = 10;
     ReplicationMetricCollector incrementDumpMetricCollector =
         new IncrementalDumpMetricCollector("testAcidTablesReplLoadBootstrapIncr_1592205875387",
             "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
@@ -315,7 +324,7 @@ public class TestReplicationMetricSink {
     Map<String, Long> metricMap = new HashMap<>();
     ReplStatsTracker repl = Mockito.mock(ReplStatsTracker.class);
 
-    Mockito.when(repl.toString()).thenReturn(RandomStringUtils.randomAlphabetic(RM_PROGRESS_LENGTH));
+    Mockito.when(repl.toString()).thenReturn(RandomStringUtils.randomAlphabetic(1000));
     metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
     incrementDumpMetricCollector.reportStageStart("dump", metricMap);
     incrementDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.EVENTS.name(), 10);
@@ -325,8 +334,12 @@ public class TestReplicationMetricSink {
     GetReplicationMetricsRequest metricsRequest = new GetReplicationMetricsRequest();
     metricsRequest.setPolicy("repl");
     ReplicationMetricList actualReplicationMetrics = Hive.get(conf).getMSC().getReplicationMetrics(metricsRequest);
-    assertTrue(actualReplicationMetrics.getReplicationMetricList().get(0).getProgress(),
-        actualReplicationMetrics.getReplicationMetricList().get(0).getProgress()
-            .contains("RM_PROGRESS LIMIT EXCEEDED"));
+    String progress = deSerialize(actualReplicationMetrics.getReplicationMetricList().get(0).getProgress());
+    assertTrue(progress, progress.contains("ERROR: RM_PROGRESS LIMIT EXCEEDED."));
+    ReplStatsTracker.RM_PROGRESS_LENGTH = origRMProgress;
+
+    //Testing K_MAX
+    repl = new ReplStatsTracker(15);
+    Assert.assertEquals(ReplStatsTracker.TOP_K_MAX, repl.getK());
   }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
index c2e544f..72d98c2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
@@ -70,7 +70,7 @@ public class TestReplicationMetricUpdateOnFailure {
   public void setup() throws Exception {
     
     conf = new HiveConf();
-    conf.set(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+    conf.set(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
     conf.set(Constants.SCHEDULED_QUERY_SCHEDULENAME, "repl");
     conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "1");
     
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java
new file mode 100644
index 0000000..a656db4
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+/**
+ * TestGenericUDFDeserialize:
+ * Utility to test the behaviour of GenericUDFDeserialize.
+ */
+public class TestGenericUDFDeserialize {
+
+    @Test
+    public void testOneArg() throws HiveException {
+        GenericUDFDeserialize udf = new GenericUDFDeserialize();
+        ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+        ObjectInspector valueOI2 = PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+        UDFArgumentException ex = null;
+        try {
+            udf.initialize(new ObjectInspector[]{valueOI1});
+        } catch (UDFArgumentException e) {
+            ex = e;
+        }
+        assertTrue(ex.getMessage().contains("The function deserialize accepts 2 arguments."));
+        assertNotNull("The function deserialize() accepts 2 argument.", ex);
+        ex = null;
+        try {
+            udf.initialize(new ObjectInspector[]{valueOI2, valueOI1});
+        } catch (UDFArgumentException e) {
+            ex = e;
+        }
+        assertNull("The function deserialize() accepts 2 argument.", ex);
+    }
+
+    @Test
+    public void testGZIPBase64Compression() throws HiveException {
+        GenericUDFDeserialize udf = new GenericUDFDeserialize();
+        udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector,
+                PrimitiveObjectInspectorFactory.writableStringObjectInspector});
+        GenericUDF.DeferredObject[] args = new GenericUDF.DeferredObject[2];
+        String expectedOutput = "test";
+        MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(new HiveConf());
+        String serializedMsg = encoder.getSerializer().serialize(expectedOutput);
+        args[0] = new GenericUDF.DeferredJavaObject(new Text(serializedMsg));
+        args[1] = new GenericUDF.DeferredJavaObject(new Text(encoder.getMessageFormat()));
+        Object actualOutput = udf.evaluate(args).toString();
+        assertEquals("deserialize() test", expectedOutput, actualOutput != null ? actualOutput : null);
+    }
+
+    @Test
+    public void testInvalidCompressionFormat() throws HiveException {
+        GenericUDFDeserialize udf = new GenericUDFDeserialize();
+        udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector,
+                PrimitiveObjectInspectorFactory.writableStringObjectInspector});
+        GenericUDF.DeferredObject[] args = new GenericUDF.DeferredObject[2];
+        String expectedOutput = "test";
+        MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(new HiveConf());
+        String serializedMsg = encoder.getSerializer().serialize(expectedOutput);
+        String compressionFormat = "randomSerialization";
+        args[0] = new GenericUDF.DeferredJavaObject(new Text(serializedMsg));
+        args[1] = new GenericUDF.DeferredJavaObject(new Text(compressionFormat));
+        HiveException ex = null;
+        try {
+            udf.evaluate(args).toString();
+        } catch (HiveException e) {
+            ex = e;
+        }
+        assertNotNull("Invalid message format provided.", ex);
+        assertTrue(ex.getMessage().contains("compressionFormat: " + compressionFormat + " is not supported."));
+    }
+}
diff --git a/ql/src/test/queries/clientpositive/replication_metrics_ingest.q b/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
index 6a8f787..a710b00 100644
--- a/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
+++ b/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
@@ -41,6 +41,6 @@ alter scheduled query repl2 disabled;
 
 show databases;
 
-select policy_name, dump_execution_id from sys.replication_metrics;
+select * from sys.replication_metrics;
 
-select count(*) from sys.replication_metrics where scheduled_execution_id > 0;
+select * from sys.replication_metrics_view order by dump_execution_id;
diff --git a/ql/src/test/queries/clientpositive/udf_deserialize.q b/ql/src/test/queries/clientpositive/udf_deserialize.q
new file mode 100644
index 0000000..b1ae43b
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/udf_deserialize.q
@@ -0,0 +1,8 @@
+
+DESCRIBE FUNCTION deserialize;
+DESCRIBE FUNCTION EXTENDED deserialize;
+
+SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip");
+SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip(json-2.0)");
+SELECT deserialize("test", "");
+SELECT deserialize("{unitTest:'udf-deserialize'}", "json-0.2");
diff --git a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index a3e8e0c..6d62725 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -72,22 +72,25 @@ destination
 information_schema
 src
 sys
-PREHOOK: query: select policy_name, dump_execution_id from sys.replication_metrics
+PREHOOK: query: select * from sys.replication_metrics
 PREHOOK: type: QUERY
 PREHOOK: Input: sys@replication_metrics
 #### A masked pattern was here ####
-POSTHOOK: query: select policy_name, dump_execution_id from sys.replication_metrics
+POSTHOOK: query: select * from sys.replication_metrics
 POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@replication_metrics
 #### A masked pattern was here ####
-repl1	0
-repl2	1
-PREHOOK: query: select count(*) from sys.replication_metrics where scheduled_execution_id > 0
+1	repl1	0	{"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA	gzip(json-2.0)
+2	repl2	1	{"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA=	gzip(json-2.0)
+PREHOOK: query: select * from sys.replication_metrics_view order by dump_execution_id
 PREHOOK: type: QUERY
 PREHOOK: Input: sys@replication_metrics
+PREHOOK: Input: sys@replication_metrics_view
 #### A masked pattern was here ####
-POSTHOOK: query: select count(*) from sys.replication_metrics where scheduled_execution_id > 0
+POSTHOOK: query: select * from sys.replication_metrics_view order by dump_execution_id
 POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@replication_metrics
+POSTHOOK: Input: sys@replication_metrics_view
 #### A masked pattern was here ####
-2
+1	repl1	0	{"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	{"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":null,"replStats":null}]}
+2	repl2	1	{"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	{"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":{"numCreated":0,"numDeleted":0},"replStats":null}]}
diff --git a/ql/src/test/results/clientpositive/llap/resourceplan.q.out b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
index 1a83c09..2b190da 100644
--- a/ql/src/test/results/clientpositive/llap/resourceplan.q.out
+++ b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
@@ -210,6 +210,10 @@ sys	replication_metrics			hive_test_user	USER	DELETE	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	INSERT	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	SELECT	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	UPDATE	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	DELETE	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	INSERT	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	SELECT	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	UPDATE	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	DELETE	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	INSERT	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	SELECT	true	-1	hive_test_user
diff --git a/ql/src/test/results/clientpositive/llap/show_functions.q.out b/ql/src/test/results/clientpositive/llap/show_functions.q.out
index c05d208..a953471 100644
--- a/ql/src/test/results/clientpositive/llap/show_functions.q.out
+++ b/ql/src/test/results/clientpositive/llap/show_functions.q.out
@@ -106,6 +106,7 @@ dayofweek
 decode
 degrees
 dense_rank
+deserialize
 div
 ds_cpc_estimate
 ds_cpc_estimate_bounds
@@ -464,6 +465,7 @@ coalesce
 current_database
 current_date
 decode
+deserialize
 ds_cpc_estimate
 ds_hll_estimate
 ds_kll_quantile
@@ -623,6 +625,7 @@ dayofweek
 decode
 degrees
 dense_rank
+deserialize
 div
 ds_cpc_estimate
 ds_cpc_estimate_bounds
diff --git a/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out b/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out
index 32eb6e2..5eb6e90 100644
--- a/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out
@@ -300,6 +300,10 @@ sys	replication_metrics			hive_test_user	USER	DELETE	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	INSERT	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	SELECT	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	UPDATE	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	DELETE	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	INSERT	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	SELECT	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	UPDATE	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	DELETE	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	INSERT	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	SELECT	true	-1	hive_test_user
@@ -490,6 +494,7 @@ PREHOOK: Output: sys@partition_params
 PREHOOK: Output: sys@partition_stats_view
 PREHOOK: Output: sys@partitions
 PREHOOK: Output: sys@replication_metrics
+PREHOOK: Output: sys@replication_metrics_view
 PREHOOK: Output: sys@role_map
 PREHOOK: Output: sys@roles
 PREHOOK: Output: sys@scheduled_executions
@@ -551,6 +556,7 @@ POSTHOOK: Output: sys@partition_params
 POSTHOOK: Output: sys@partition_stats_view
 POSTHOOK: Output: sys@partitions
 POSTHOOK: Output: sys@replication_metrics
+POSTHOOK: Output: sys@replication_metrics_view
 POSTHOOK: Output: sys@role_map
 POSTHOOK: Output: sys@roles
 POSTHOOK: Output: sys@scheduled_executions
diff --git a/ql/src/test/results/clientpositive/llap/sysdb.q.out b/ql/src/test/results/clientpositive/llap/sysdb.q.out
index 1b0e267..da1cf94 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb.q.out
@@ -248,6 +248,10 @@ sys	replication_metrics			hive_test_user	USER	DELETE	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	INSERT	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	SELECT	true	-1	hive_test_user
 sys	replication_metrics			hive_test_user	USER	UPDATE	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	DELETE	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	INSERT	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	SELECT	true	-1	hive_test_user
+sys	replication_metrics_view			hive_test_user	USER	UPDATE	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	DELETE	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	INSERT	true	-1	hive_test_user
 sys	role_map			hive_test_user	USER	SELECT	true	-1	hive_test_user
@@ -712,10 +716,16 @@ partitions	part_name
 partitions	sd_id
 partitions	tbl_id
 replication_metrics	dump_execution_id
+replication_metrics	message_format
 replication_metrics	metadata
 replication_metrics	policy_name
 replication_metrics	progress
 replication_metrics	scheduled_execution_id
+replication_metrics_view	dump_execution_id
+replication_metrics_view	metadata
+replication_metrics_view	policy_name
+replication_metrics_view	progress
+replication_metrics_view	scheduled_execution_id
 role_map	add_time
 role_map	grant_option
 role_map	grantor
@@ -1215,7 +1225,7 @@ POSTHOOK: query: select count(*) from sds
 POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@sds
 #### A masked pattern was here ####
-74
+75
 PREHOOK: query: select param_key, param_value from sd_params order by param_key, param_value limit 5
 PREHOOK: type: QUERY
 PREHOOK: Input: sys@sd_params
@@ -1623,6 +1633,7 @@ default	sys	partition_params	BASE_TABLE	NULL	NULL	NULL	NULL	NULL	YES	NO	NULL
 default	sys	partition_stats_view	VIEW	NULL	NULL	NULL	NULL	NULL	NO	NO	NULL
 default	sys	partitions	BASE_TABLE	NULL	NULL	NULL	NULL	NULL	YES	NO	NULL
 default	sys	replication_metrics	BASE_TABLE	NULL	NULL	NULL	NULL	NULL	YES	NO	NULL
+default	sys	replication_metrics_view	VIEW	NULL	NULL	NULL	NULL	NULL	NO	NO	NULL
 default	sys	role_map	BASE_TABLE	NULL	NULL	NULL	NULL	NULL	YES	NO	NULL
 default	sys	roles	BASE_TABLE	NULL	NULL	NULL	NULL	NULL	YES	NO	NULL
 default	sys	scheduled_executions	BASE_TABLE	NULL	NULL	NULL	NULL	NULL	YES	NO	NULL
@@ -1746,6 +1757,7 @@ information_schema	views
 sys	compactions
 sys	locks
 sys	partition_stats_view
+sys	replication_metrics_view
 sys	table_stats_view
 sys	transactions
 sys	version
diff --git a/ql/src/test/results/clientpositive/llap/udf_deserialize.q.out b/ql/src/test/results/clientpositive/llap/udf_deserialize.q.out
new file mode 100644
index 0000000..b8bc16a
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/udf_deserialize.q.out
@@ -0,0 +1,52 @@
+PREHOOK: query: DESCRIBE FUNCTION deserialize
+PREHOOK: type: DESCFUNCTION
+POSTHOOK: query: DESCRIBE FUNCTION deserialize
+POSTHOOK: type: DESCFUNCTION
+deserialize(base64 encoded message, compressionFormat) - Returns plain text string of given message which was compressed in compressionFormat and base64 encoded.
+PREHOOK: query: DESCRIBE FUNCTION EXTENDED deserialize
+PREHOOK: type: DESCFUNCTION
+POSTHOOK: query: DESCRIBE FUNCTION EXTENDED deserialize
+POSTHOOK: type: DESCFUNCTION
+deserialize(base64 encoded message, compressionFormat) - Returns plain text string of given message which was compressed in compressionFormat and base64 encoded.
+Currently, Supports only 'gzip' for Gzip compressed and base 64 encoded strings.
+Example:
+  > SELECT deserialize('H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA', 'gzip') FROM src LIMIT 1;
+  test
+Function class:org.apache.hadoop.hive.ql.udf.generic.GenericUDFDeserialize
+Function type:BUILTIN
+PREHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+test
+PREHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip(json-2.0)")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip(json-2.0)")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+test
+PREHOOK: query: SELECT deserialize("test", "")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("test", "")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+test
+PREHOOK: query: SELECT deserialize("{unitTest:'udf-deserialize'}", "json-0.2")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("{unitTest:'udf-deserialize'}", "json-0.2")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+{unitTest:'udf-deserialize'}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 2ff5912..a2b2652 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -46769,6 +46769,11 @@ void ReplicationMetrics::__set_progress(const std::string& val) {
   this->progress = val;
 __isset.progress = true;
 }
+
+void ReplicationMetrics::__set_messageFormat(const std::string& val) {
+  this->messageFormat = val;
+__isset.messageFormat = true;
+}
 std::ostream& operator<<(std::ostream& out, const ReplicationMetrics& obj)
 {
   obj.printTo(out);
@@ -46840,6 +46845,14 @@ uint32_t ReplicationMetrics::read(::apache::thrift::protocol::TProtocol* iprot)
           xfer += iprot->skip(ftype);
         }
         break;
+      case 6:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->messageFormat);
+          this->__isset.messageFormat = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -46885,6 +46898,11 @@ uint32_t ReplicationMetrics::write(::apache::thrift::protocol::TProtocol* oprot)
     xfer += oprot->writeString(this->progress);
     xfer += oprot->writeFieldEnd();
   }
+  if (this->__isset.messageFormat) {
+    xfer += oprot->writeFieldBegin("messageFormat", ::apache::thrift::protocol::T_STRING, 6);
+    xfer += oprot->writeString(this->messageFormat);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -46897,6 +46915,7 @@ void swap(ReplicationMetrics &a, ReplicationMetrics &b) {
   swap(a.dumpExecutionId, b.dumpExecutionId);
   swap(a.metadata, b.metadata);
   swap(a.progress, b.progress);
+  swap(a.messageFormat, b.messageFormat);
   swap(a.__isset, b.__isset);
 }
 
@@ -46906,6 +46925,7 @@ ReplicationMetrics::ReplicationMetrics(const ReplicationMetrics& other1665) {
   dumpExecutionId = other1665.dumpExecutionId;
   metadata = other1665.metadata;
   progress = other1665.progress;
+  messageFormat = other1665.messageFormat;
   __isset = other1665.__isset;
 }
 ReplicationMetrics& ReplicationMetrics::operator=(const ReplicationMetrics& other1666) {
@@ -46914,6 +46934,7 @@ ReplicationMetrics& ReplicationMetrics::operator=(const ReplicationMetrics& othe
   dumpExecutionId = other1666.dumpExecutionId;
   metadata = other1666.metadata;
   progress = other1666.progress;
+  messageFormat = other1666.messageFormat;
   __isset = other1666.__isset;
   return *this;
 }
@@ -46925,6 +46946,7 @@ void ReplicationMetrics::printTo(std::ostream& out) const {
   out << ", " << "dumpExecutionId=" << to_string(dumpExecutionId);
   out << ", " << "metadata="; (__isset.metadata ? (out << to_string(metadata)) : (out << "<null>"));
   out << ", " << "progress="; (__isset.progress ? (out << to_string(progress)) : (out << "<null>"));
+  out << ", " << "messageFormat="; (__isset.messageFormat ? (out << to_string(messageFormat)) : (out << "<null>"));
   out << ")";
 }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 025e733..19a318e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -17640,9 +17640,10 @@ void swap(GetPartitionsPsWithAuthResponse &a, GetPartitionsPsWithAuthResponse &b
 std::ostream& operator<<(std::ostream& out, const GetPartitionsPsWithAuthResponse& obj);
 
 typedef struct _ReplicationMetrics__isset {
-  _ReplicationMetrics__isset() : metadata(false), progress(false) {}
+  _ReplicationMetrics__isset() : metadata(false), progress(false), messageFormat(false) {}
   bool metadata :1;
   bool progress :1;
+  bool messageFormat :1;
 } _ReplicationMetrics__isset;
 
 class ReplicationMetrics : public virtual ::apache::thrift::TBase {
@@ -17650,7 +17651,7 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
 
   ReplicationMetrics(const ReplicationMetrics&);
   ReplicationMetrics& operator=(const ReplicationMetrics&);
-  ReplicationMetrics() : scheduledExecutionId(0), policy(), dumpExecutionId(0), metadata(), progress() {
+  ReplicationMetrics() : scheduledExecutionId(0), policy(), dumpExecutionId(0), metadata(), progress(), messageFormat() {
   }
 
   virtual ~ReplicationMetrics() noexcept;
@@ -17659,6 +17660,7 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
   int64_t dumpExecutionId;
   std::string metadata;
   std::string progress;
+  std::string messageFormat;
 
   _ReplicationMetrics__isset __isset;
 
@@ -17672,6 +17674,8 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
 
   void __set_progress(const std::string& val);
 
+  void __set_messageFormat(const std::string& val);
+
   bool operator == (const ReplicationMetrics & rhs) const
   {
     if (!(scheduledExecutionId == rhs.scheduledExecutionId))
@@ -17688,6 +17692,10 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
       return false;
     else if (__isset.progress && !(progress == rhs.progress))
       return false;
+    if (__isset.messageFormat != rhs.__isset.messageFormat)
+      return false;
+    else if (__isset.messageFormat && !(messageFormat == rhs.messageFormat))
+      return false;
     return true;
   }
   bool operator != (const ReplicationMetrics &rhs) const {
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java
index 50eda8d..8962938 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.metastore.api;
   private static final org.apache.thrift.protocol.TField DUMP_EXECUTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("dumpExecutionId", org.apache.thrift.protocol.TType.I64, (short)3);
   private static final org.apache.thrift.protocol.TField METADATA_FIELD_DESC = new org.apache.thrift.protocol.TField("metadata", org.apache.thrift.protocol.TType.STRING, (short)4);
   private static final org.apache.thrift.protocol.TField PROGRESS_FIELD_DESC = new org.apache.thrift.protocol.TField("progress", org.apache.thrift.protocol.TType.STRING, (short)5);
+  private static final org.apache.thrift.protocol.TField MESSAGE_FORMAT_FIELD_DESC = new org.apache.thrift.protocol.TField("messageFormat", org.apache.thrift.protocol.TType.STRING, (short)6);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ReplicationMetricsStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ReplicationMetricsTupleSchemeFactory();
@@ -25,6 +26,7 @@ package org.apache.hadoop.hive.metastore.api;
   private long dumpExecutionId; // required
   private @org.apache.thrift.annotation.Nullable java.lang.String metadata; // optional
   private @org.apache.thrift.annotation.Nullable java.lang.String progress; // optional
+  private @org.apache.thrift.annotation.Nullable java.lang.String messageFormat; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -32,7 +34,8 @@ package org.apache.hadoop.hive.metastore.api;
     POLICY((short)2, "policy"),
     DUMP_EXECUTION_ID((short)3, "dumpExecutionId"),
     METADATA((short)4, "metadata"),
-    PROGRESS((short)5, "progress");
+    PROGRESS((short)5, "progress"),
+    MESSAGE_FORMAT((short)6, "messageFormat");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -58,6 +61,8 @@ package org.apache.hadoop.hive.metastore.api;
           return METADATA;
         case 5: // PROGRESS
           return PROGRESS;
+        case 6: // MESSAGE_FORMAT
+          return MESSAGE_FORMAT;
         default:
           return null;
       }
@@ -102,7 +107,7 @@ package org.apache.hadoop.hive.metastore.api;
   private static final int __SCHEDULEDEXECUTIONID_ISSET_ID = 0;
   private static final int __DUMPEXECUTIONID_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.METADATA,_Fields.PROGRESS};
+  private static final _Fields optionals[] = {_Fields.METADATA,_Fields.PROGRESS,_Fields.MESSAGE_FORMAT};
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -116,6 +121,8 @@ package org.apache.hadoop.hive.metastore.api;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.PROGRESS, new org.apache.thrift.meta_data.FieldMetaData("progress", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.MESSAGE_FORMAT, new org.apache.thrift.meta_data.FieldMetaData("messageFormat", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ReplicationMetrics.class, metaDataMap);
   }
@@ -152,6 +159,9 @@ package org.apache.hadoop.hive.metastore.api;
     if (other.isSetProgress()) {
       this.progress = other.progress;
     }
+    if (other.isSetMessageFormat()) {
+      this.messageFormat = other.messageFormat;
+    }
   }
 
   public ReplicationMetrics deepCopy() {
@@ -167,6 +177,7 @@ package org.apache.hadoop.hive.metastore.api;
     this.dumpExecutionId = 0;
     this.metadata = null;
     this.progress = null;
+    this.messageFormat = null;
   }
 
   public long getScheduledExecutionId() {
@@ -285,6 +296,30 @@ package org.apache.hadoop.hive.metastore.api;
     }
   }
 
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.String getMessageFormat() {
+    return this.messageFormat;
+  }
+
+  public void setMessageFormat(@org.apache.thrift.annotation.Nullable java.lang.String messageFormat) {
+    this.messageFormat = messageFormat;
+  }
+
+  public void unsetMessageFormat() {
+    this.messageFormat = null;
+  }
+
+  /** Returns true if field messageFormat is set (has been assigned a value) and false otherwise */
+  public boolean isSetMessageFormat() {
+    return this.messageFormat != null;
+  }
+
+  public void setMessageFormatIsSet(boolean value) {
+    if (!value) {
+      this.messageFormat = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case SCHEDULED_EXECUTION_ID:
@@ -327,6 +362,14 @@ package org.apache.hadoop.hive.metastore.api;
       }
       break;
 
+    case MESSAGE_FORMAT:
+      if (value == null) {
+        unsetMessageFormat();
+      } else {
+        setMessageFormat((java.lang.String)value);
+      }
+      break;
+
     }
   }
 
@@ -348,6 +391,9 @@ package org.apache.hadoop.hive.metastore.api;
     case PROGRESS:
       return getProgress();
 
+    case MESSAGE_FORMAT:
+      return getMessageFormat();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -369,6 +415,8 @@ package org.apache.hadoop.hive.metastore.api;
       return isSetMetadata();
     case PROGRESS:
       return isSetProgress();
+    case MESSAGE_FORMAT:
+      return isSetMessageFormat();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -431,6 +479,15 @@ package org.apache.hadoop.hive.metastore.api;
         return false;
     }
 
+    boolean this_present_messageFormat = true && this.isSetMessageFormat();
+    boolean that_present_messageFormat = true && that.isSetMessageFormat();
+    if (this_present_messageFormat || that_present_messageFormat) {
+      if (!(this_present_messageFormat && that_present_messageFormat))
+        return false;
+      if (!this.messageFormat.equals(that.messageFormat))
+        return false;
+    }
+
     return true;
   }
 
@@ -454,6 +511,10 @@ package org.apache.hadoop.hive.metastore.api;
     if (isSetProgress())
       hashCode = hashCode * 8191 + progress.hashCode();
 
+    hashCode = hashCode * 8191 + ((isSetMessageFormat()) ? 131071 : 524287);
+    if (isSetMessageFormat())
+      hashCode = hashCode * 8191 + messageFormat.hashCode();
+
     return hashCode;
   }
 
@@ -515,6 +576,16 @@ package org.apache.hadoop.hive.metastore.api;
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetMessageFormat(), other.isSetMessageFormat());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMessageFormat()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.messageFormat, other.messageFormat);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -571,6 +642,16 @@ package org.apache.hadoop.hive.metastore.api;
       }
       first = false;
     }
+    if (isSetMessageFormat()) {
+      if (!first) sb.append(", ");
+      sb.append("messageFormat:");
+      if (this.messageFormat == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.messageFormat);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -668,6 +749,14 @@ package org.apache.hadoop.hive.metastore.api;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 6: // MESSAGE_FORMAT
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.messageFormat = iprot.readString();
+              struct.setMessageFormatIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -706,6 +795,13 @@ package org.apache.hadoop.hive.metastore.api;
           oprot.writeFieldEnd();
         }
       }
+      if (struct.messageFormat != null) {
+        if (struct.isSetMessageFormat()) {
+          oprot.writeFieldBegin(MESSAGE_FORMAT_FIELD_DESC);
+          oprot.writeString(struct.messageFormat);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -733,13 +829,19 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetProgress()) {
         optionals.set(1);
       }
-      oprot.writeBitSet(optionals, 2);
+      if (struct.isSetMessageFormat()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
       if (struct.isSetMetadata()) {
         oprot.writeString(struct.metadata);
       }
       if (struct.isSetProgress()) {
         oprot.writeString(struct.progress);
       }
+      if (struct.isSetMessageFormat()) {
+        oprot.writeString(struct.messageFormat);
+      }
     }
 
     @Override
@@ -751,7 +853,7 @@ package org.apache.hadoop.hive.metastore.api;
       struct.setPolicyIsSet(true);
       struct.dumpExecutionId = iprot.readI64();
       struct.setDumpExecutionIdIsSet(true);
-      java.util.BitSet incoming = iprot.readBitSet(2);
+      java.util.BitSet incoming = iprot.readBitSet(3);
       if (incoming.get(0)) {
         struct.metadata = iprot.readString();
         struct.setMetadataIsSet(true);
@@ -760,6 +862,10 @@ package org.apache.hadoop.hive.metastore.api;
         struct.progress = iprot.readString();
         struct.setProgressIsSet(true);
       }
+      if (incoming.get(2)) {
+        struct.messageFormat = iprot.readString();
+        struct.setMessageFormatIsSet(true);
+      }
     }
   }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php
index c1b19de..0093684 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php
@@ -46,6 +46,11 @@ class ReplicationMetrics
             'isRequired' => false,
             'type' => TType::STRING,
         ),
+        6 => array(
+            'var' => 'messageFormat',
+            'isRequired' => false,
+            'type' => TType::STRING,
+        ),
     );
 
     /**
@@ -68,6 +73,10 @@ class ReplicationMetrics
      * @var string
      */
     public $progress = null;
+    /**
+     * @var string
+     */
+    public $messageFormat = null;
 
     public function __construct($vals = null)
     {
@@ -87,6 +96,9 @@ class ReplicationMetrics
             if (isset($vals['progress'])) {
                 $this->progress = $vals['progress'];
             }
+            if (isset($vals['messageFormat'])) {
+                $this->messageFormat = $vals['messageFormat'];
+            }
         }
     }
 
@@ -144,6 +156,13 @@ class ReplicationMetrics
                         $xfer += $input->skip($ftype);
                     }
                     break;
+                case 6:
+                    if ($ftype == TType::STRING) {
+                        $xfer += $input->readString($this->messageFormat);
+                    } else {
+                        $xfer += $input->skip($ftype);
+                    }
+                    break;
                 default:
                     $xfer += $input->skip($ftype);
                     break;
@@ -183,6 +202,11 @@ class ReplicationMetrics
             $xfer += $output->writeString($this->progress);
             $xfer += $output->writeFieldEnd();
         }
+        if ($this->messageFormat !== null) {
+            $xfer += $output->writeFieldBegin('messageFormat', TType::STRING, 6);
+            $xfer += $output->writeString($this->messageFormat);
+            $xfer += $output->writeFieldEnd();
+        }
         $xfer += $output->writeFieldStop();
         $xfer += $output->writeStructEnd();
         return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 756e31e..f3e7080 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -26760,16 +26760,18 @@ class ReplicationMetrics(object):
      - dumpExecutionId
      - metadata
      - progress
+     - messageFormat
 
     """
 
 
-    def __init__(self, scheduledExecutionId=None, policy=None, dumpExecutionId=None, metadata=None, progress=None,):
+    def __init__(self, scheduledExecutionId=None, policy=None, dumpExecutionId=None, metadata=None, progress=None, messageFormat=None,):
         self.scheduledExecutionId = scheduledExecutionId
         self.policy = policy
         self.dumpExecutionId = dumpExecutionId
         self.metadata = metadata
         self.progress = progress
+        self.messageFormat = messageFormat
 
     def read(self, iprot):
         if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -26805,6 +26807,11 @@ class ReplicationMetrics(object):
                     self.progress = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString()
                 else:
                     iprot.skip(ftype)
+            elif fid == 6:
+                if ftype == TType.STRING:
+                    self.messageFormat = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString()
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -26835,6 +26842,10 @@ class ReplicationMetrics(object):
             oprot.writeFieldBegin('progress', TType.STRING, 5)
             oprot.writeString(self.progress.encode('utf-8') if sys.version_info[0] == 2 else self.progress)
             oprot.writeFieldEnd()
+        if self.messageFormat is not None:
+            oprot.writeFieldBegin('messageFormat', TType.STRING, 6)
+            oprot.writeString(self.messageFormat.encode('utf-8') if sys.version_info[0] == 2 else self.messageFormat)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -31106,6 +31117,7 @@ ReplicationMetrics.thrift_spec = (
     (3, TType.I64, 'dumpExecutionId', None, None, ),  # 3
     (4, TType.STRING, 'metadata', 'UTF8', None, ),  # 4
     (5, TType.STRING, 'progress', 'UTF8', None, ),  # 5
+    (6, TType.STRING, 'messageFormat', 'UTF8', None, ),  # 6
 )
 all_structs.append(ReplicationMetricList)
 ReplicationMetricList.thrift_spec = (
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 57749a9..4da24a8 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -7352,13 +7352,15 @@ class ReplicationMetrics
   DUMPEXECUTIONID = 3
   METADATA = 4
   PROGRESS = 5
+  MESSAGEFORMAT = 6
 
   FIELDS = {
     SCHEDULEDEXECUTIONID => {:type => ::Thrift::Types::I64, :name => 'scheduledExecutionId'},
     POLICY => {:type => ::Thrift::Types::STRING, :name => 'policy'},
     DUMPEXECUTIONID => {:type => ::Thrift::Types::I64, :name => 'dumpExecutionId'},
     METADATA => {:type => ::Thrift::Types::STRING, :name => 'metadata', :optional => true},
-    PROGRESS => {:type => ::Thrift::Types::STRING, :name => 'progress', :optional => true}
+    PROGRESS => {:type => ::Thrift::Types::STRING, :name => 'progress', :optional => true},
+    MESSAGEFORMAT => {:type => ::Thrift::Types::STRING, :name => 'messageFormat', :optional => true}
   }
 
   def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 21ea1f8..e42493b 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -730,6 +730,10 @@ public class MetastoreConf {
         "hive.metastore.event.message.factory",
         "org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder",
         "Factory class for making encoding and decoding messages in the events generated."),
+    REPL_MESSAGE_FACTORY("metastore.repl.message.factory",
+            "hive.metastore.repl.message.factory",
+            "org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder",
+            "Factory class to serialize and deserialize information in replication metrics table."),
     EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS("metastore.notification.parameters.exclude.patterns",
         "hive.metastore.notification.parameters.exclude.patterns", "",
         "List of comma-separated regexes that are used to reduced the size of HMS Notification messages."
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index e1d7006..882e901 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -2200,7 +2200,8 @@ struct ReplicationMetrics{
   2: required string policy,
   3: required i64 dumpExecutionId,
   4: optional string metadata,
-  5: optional string progress
+  5: optional string progress,
+  6: optional string messageFormat
 }
 
 struct ReplicationMetricList{
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 590884c..ea70e94 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -287,6 +287,9 @@ public class ObjectStore implements RawStore, Configurable {
   */
   private final static AtomicBoolean isSchemaVerified = new AtomicBoolean(false);
   private static final Logger LOG = LoggerFactory.getLogger(ObjectStore.class);
+  private int RM_PROGRESS_COL_WIDTH = 10000;
+  private int RM_METADATA_COL_WIDTH = 4000;
+  private int ORACLE_DB_MAX_COL_WIDTH = 4000;
 
   private enum TXN_STATUS {
     NO_STATE, OPEN, COMMITED, ROLLBACK
@@ -14364,17 +14367,22 @@ public class ObjectStore implements RawStore, Configurable {
           mReplicationMetrics.setStartTime((int) (System.currentTimeMillis()/1000));
         }
         if (!StringUtils.isEmpty(replicationMetric.getMetadata())) {
-          mReplicationMetrics.setMetadata(replicationMetric.getMetadata());
+          if (replicationMetric.getMetadata().length() > RM_METADATA_COL_WIDTH) {
+            mReplicationMetrics.setProgress("RM_Metadata limit exceeded to " + replicationMetric.getMetadata().length());
+          } else {
+            mReplicationMetrics.setMetadata(replicationMetric.getMetadata());
+          }
         }
         if (!StringUtils.isEmpty(replicationMetric.getProgress())) {
           // Check for the limit of RM_PROGRESS Column.
-          if ((dbType.isORACLE() && replicationMetric.getProgress().length() > 4000)
-              || replicationMetric.getProgress().length() > 24000) {
-            mReplicationMetrics.setProgress("RM_PROGRESS LIMIT EXCEEDED");
+          if ((dbType.isORACLE() && replicationMetric.getProgress().length() > ORACLE_DB_MAX_COL_WIDTH)
+              || replicationMetric.getProgress().length() > RM_PROGRESS_COL_WIDTH) {
+            mReplicationMetrics.setProgress("RM_Progress limit exceeded to " + replicationMetric.getProgress().length());
           } else {
             mReplicationMetrics.setProgress(replicationMetric.getProgress());
           }
         }
+        mReplicationMetrics.setMessageFormat(replicationMetric.getMessageFormat());
         mReplicationMetricsList.add(mReplicationMetrics);
       }
       pm.makePersistentAll(mReplicationMetricsList);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
index ffbedce..9455e1d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
@@ -237,6 +237,13 @@ public abstract class MessageDeserializer {
    */
   public abstract DeletePartitionColumnStatMessage getDeletePartitionColumnStatMessage(String messageBody);
 
+  /**
+   * Method to de-serialize any string passed. Need to be over-ridden by specific serialization subclasses.
+   */
+  public String deSerializeGenericString(String messageBody) {
+    return messageBody;
+  }
+
   // Protection against construction.
   protected MessageDeserializer() {}
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index 16e74bb..62cdaf0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -50,6 +50,7 @@ public abstract class MessageFactory {
   static {
     register(GzipJSONMessageEncoder.FORMAT, GzipJSONMessageEncoder.class);
     register(JSONMessageEncoder.FORMAT, JSONMessageEncoder.class);
+    register(supportedCompressionFormats.GZIP.toString().toLowerCase(), GzipJSONMessageEncoder.class);
   }
 
   private static Method requiredMethod(Class clazz) {
@@ -80,19 +81,27 @@ public abstract class MessageFactory {
     throw new IllegalArgumentException(message);
   }
 
-  public static MessageEncoder getInstance(String messageFormat)
+  public static MessageEncoder getInstance(String compressionFormat)
       throws InvocationTargetException, IllegalAccessException {
-    Method methodInstance = registry.get(messageFormat);
+    Method methodInstance = registry.get(compressionFormat.toLowerCase());
     if (methodInstance == null) {
-      LOG.error("received incorrect MessageFormat " + messageFormat);
-      throw new RuntimeException("messageFormat: " + messageFormat + " is not supported ");
+      LOG.error("received incorrect CompressionFormat " + compressionFormat);
+      throw new RuntimeException("compressionFormat: " + compressionFormat + " is not supported.");
     }
     return (MessageEncoder) methodInstance.invoke(null);
   }
 
+  public static MessageEncoder getDefaultInstanceForReplMetrics(Configuration conf) {
+    return getInstance(conf, MetastoreConf.ConfVars.REPL_MESSAGE_FACTORY.getVarname());
+  }
+
   public static MessageEncoder getDefaultInstance(Configuration conf) {
+    return getInstance(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getVarname());
+  }
+
+  public static MessageEncoder getInstance(Configuration conf, String config) {
     String clazz =
-        MetastoreConf.get(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getVarname());
+            MetastoreConf.get(conf, config);
     try {
       Class<?> clazzObject = MessageFactory.class.getClassLoader().loadClass(clazz);
       return (MessageEncoder) requiredMethod(clazzObject).invoke(null);
@@ -102,4 +111,11 @@ public abstract class MessageFactory {
       throw new IllegalStateException(message, e);
     }
   }
+
+  public enum supportedCompressionFormats {
+    /**
+     * Currently supported compressionFormats for encoding and decoding the messages in backend RDBMS.
+     */
+    GZIP
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
index b249d76..5b452cf 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
@@ -21,4 +21,7 @@ public interface MessageSerializer {
   default String serialize(EventMessage message) {
     return message.toString();
   }
+  default String serialize(String msg) {
+    return msg;
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
index bfea32f..6a67308 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
@@ -232,4 +232,8 @@ public class DeSerializer extends JSONMessageDeserializer {
   public DeletePartitionColumnStatMessage getDeletePartitionColumnStatMessage(String messageBody) {
     return super.getDeletePartitionColumnStatMessage(deCompress(messageBody));
   }
+
+  public String deSerializeGenericString(String messageBody) {
+    return deCompress(messageBody);
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
index 7786d96..ef43590 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
@@ -34,16 +34,20 @@ class Serializer implements MessageSerializer {
 
   @Override
   public String serialize(EventMessage message) {
-    String messageAsString = MessageSerializer.super.serialize(message);
+    return serialize(MessageSerializer.super.serialize(message));
+  }
+
+  @Override
+  public String serialize(String msg) {
     try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
       GZIPOutputStream gout = new GZIPOutputStream(baos);
-      gout.write(messageAsString.getBytes(StandardCharsets.UTF_8));
+      gout.write(msg.getBytes(StandardCharsets.UTF_8));
       gout.close();
       byte[] compressed = baos.toByteArray();
       return new String(Base64.getEncoder().encode(compressed), StandardCharsets.UTF_8);
     } catch (IOException e) {
       LOG.error("could not use gzip output stream", e);
-      LOG.debug("message " + messageAsString);
+      LOG.debug("message " + msg);
       throw new RuntimeException("could not use the gzip output Stream", e);
     }
   }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java
index 5fe3129..e051621 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java
@@ -30,6 +30,7 @@ public class MReplicationMetrics {
   private String metadata;
   private String progress;
   private int startTime;
+  private String messageFormat;
 
   public MReplicationMetrics() {
   }
@@ -41,6 +42,7 @@ public class MReplicationMetrics {
     ret.setMetadata(mReplicationMetric.metadata);
     ret.setProgress(mReplicationMetric.progress);
     ret.setDumpExecutionId(mReplicationMetric.dumpExecutionId);
+    ret.setMessageFormat(mReplicationMetric.messageFormat);
     return ret;
   }
 
@@ -91,4 +93,12 @@ public class MReplicationMetrics {
   public void setStartTime(int startTime) {
     this.startTime = startTime;
   }
+
+  public String getMessageFormat() {
+    return messageFormat;
+  }
+
+  public void setMessageFormat(String messageFormat) {
+    this.messageFormat = messageFormat;
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/resources/package.jdo b/standalone-metastore/metastore-server/src/main/resources/package.jdo
index c5a7db1..6b6ee51 100644
--- a/standalone-metastore/metastore-server/src/main/resources/package.jdo
+++ b/standalone-metastore/metastore-server/src/main/resources/package.jdo
@@ -1556,11 +1556,14 @@
         <column name="RM_METADATA" jdbc-type="varchar" length="4000" allows-null="true"/>
       </field>
       <field name="progress">
-        <column name="RM_PROGRESS" jdbc-type="varchar" length="24000" allows-null="true"/>
+        <column name="RM_PROGRESS" jdbc-type="varchar" length="10000" allows-null="true"/>
       </field>
       <field name="startTime">
          <column name="RM_START_TIME" jdbc-type="integer" allows-null="false"/>
       </field>
+      <field name="messageFormat">
+              <column name="MESSAGE_FORMAT" length="16" jdbc-type="VARCHAR" allows-null="true"/>
+      </field>
       <index name="PolicyIndex">
         <column name="RM_POLICY"/>
       </index>
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 525a909..8fa3470 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -803,8 +803,9 @@ CREATE TABLE "APP"."REPLICATION_METRICS" (
   "RM_POLICY" varchar(256) NOT NULL,
   "RM_DUMP_EXECUTION_ID" bigint NOT NULL,
   "RM_METADATA" varchar(4000),
-  "RM_PROGRESS" varchar(24000),
+  "RM_PROGRESS" varchar(10000),
   "RM_START_TIME" integer not null,
+  "MESSAGE_FORMAT" VARCHAR(16),
   PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
 );
 
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index c9a5393..ead33d7 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -97,8 +97,18 @@ CREATE TABLE "APP"."REPLICATION_METRICS" (
   PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
 );
 
---Increase the size of RM_PROGRESS to accomodate the replication statistics
-ALTER TABLE "APP"."REPLICATION_METRICS" ALTER "RM_PROGRESS" SET DATA TYPE VARCHAR(24000);
+DROP TABLE "APP"."REPLICATION_METRICS";
+
+CREATE TABLE "APP"."REPLICATION_METRICS" (
+  "RM_SCHEDULED_EXECUTION_ID" bigint NOT NULL,
+  "RM_POLICY" varchar(256) NOT NULL,
+  "RM_DUMP_EXECUTION_ID" bigint NOT NULL,
+  "RM_METADATA" varchar(4000),
+  "RM_PROGRESS" varchar(10000),
+  "RM_START_TIME" integer not null,
+  "MESSAGE_FORMAT" VARCHAR(16),
+  PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
+);
 
 CREATE INDEX "POLICY_IDX" ON "APP"."REPLICATION_METRICS" ("RM_POLICY");
 CREATE INDEX "DUMP_IDX" ON "APP"."REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 6f704a0..af5e763 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1367,7 +1367,8 @@ CREATE TABLE "REPLICATION_METRICS" (
   "RM_DUMP_EXECUTION_ID" bigint NOT NULL,
   "RM_METADATA" varchar(max),
   "RM_PROGRESS" varchar(max),
-  "RM_START_TIME" integer NOT NULL
+  "RM_START_TIME" integer NOT NULL,
+  "MESSAGE_FORMAT" nvarchar(16),
 );
 
 -- Create indexes for the replication metrics table
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 91bca6c..166bf8e 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -132,6 +132,8 @@ CREATE TABLE "REPLICATION_METRICS" (
   "RM_START_TIME" integer NOT NULL
 );
 
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" VARCHAR(16);
+
 -- Create indexes for the replication metrics table
 CREATE INDEX "POLICY_IDX" ON "REPLICATION_METRICS" ("RM_POLICY");
 CREATE INDEX "DUMP_IDX" ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 4163199..66995cf 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1269,8 +1269,9 @@ CREATE TABLE IF NOT EXISTS REPLICATION_METRICS (
   RM_POLICY varchar(256) NOT NULL,
   RM_DUMP_EXECUTION_ID bigint NOT NULL,
   RM_METADATA varchar(4000),
-  RM_PROGRESS varchar(24000),
+  RM_PROGRESS varchar(10000),
   RM_START_TIME integer NOT NULL,
+  MESSAGE_FORMAT varchar(16),
   PRIMARY KEY(RM_SCHEDULED_EXECUTION_ID)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 49451c4..8786d4a 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -107,6 +107,10 @@ CREATE TABLE IF NOT EXISTS REPLICATION_METRICS (
 --Increase the size of RM_PROGRESS to accomodate the replication statistics
 ALTER TABLE REPLICATION_METRICS MODIFY RM_PROGRESS varchar(24000);
 
+ALTER TABLE REPLICATION_METRICS MODIFY RM_PROGRESS varchar(10000);
+
+ALTER TABLE REPLICATION_METRICS ADD COLUMN `MESSAGE_FORMAT` VARCHAR(16);
+
 -- Create indexes for the replication metrics table
 CREATE INDEX POLICY_IDX ON REPLICATION_METRICS (RM_POLICY);
 CREATE INDEX DUMP_IDX ON REPLICATION_METRICS (RM_DUMP_EXECUTION_ID);
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index e5282a5..303a2f3 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1264,7 +1264,8 @@ CREATE TABLE "REPLICATION_METRICS" (
   "RM_DUMP_EXECUTION_ID" number NOT NULL,
   "RM_METADATA" varchar2(4000),
   "RM_PROGRESS" varchar2(4000),
-  "RM_START_TIME" integer NOT NULL
+  "RM_START_TIME" integer NOT NULL,
+  "MESSAGE_FORMAT" VARCHAR(16)
 );
 
 --Create indexes for the replication metrics table
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index 610f840..d5715ba 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -101,6 +101,8 @@ CREATE TABLE "REPLICATION_METRICS" (
   "RM_START_TIME" integer NOT NULL
 );
 
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" VARCHAR(16);
+
 --Create indexes for the replication metrics table
 CREATE INDEX POLICY_IDX ON "REPLICATION_METRICS" ("RM_POLICY");
 CREATE INDEX DUMP_IDX ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index dc311bb..c80a374 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1976,8 +1976,9 @@ CREATE TABLE "REPLICATION_METRICS" (
   "RM_POLICY" varchar(256) NOT NULL,
   "RM_DUMP_EXECUTION_ID" bigint NOT NULL,
   "RM_METADATA" varchar(4000),
-  "RM_PROGRESS" varchar(24000),
+  "RM_PROGRESS" varchar(10000),
   "RM_START_TIME" integer NOT NULL,
+  "MESSAGE_FORMAT" VARCHAR(16),
   PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
 );
 
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index fbbba67..41b16b1 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -231,6 +231,10 @@ CREATE TABLE "REPLICATION_METRICS" (
 --Increase the size of RM_PROGRESS to accomodate the replication statistics
 ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(24000);
 
+ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(10000);
+
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" varchar(16);
+
 --Create indexes for the replication metrics table
 CREATE INDEX "POLICY_IDX" ON "REPLICATION_METRICS" ("RM_POLICY");
 CREATE INDEX "DUMP_IDX" ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
index c9ec652..21f8061 100644
--- a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
@@ -51,6 +51,10 @@ CREATE TABLE "REPLICATION_METRICS" (
 --Increase the size of RM_PROGRESS to accomodate the replication statistics
 ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(24000);
 
+ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(10000);
+
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" varchar(16);
+
 --Create indexes for the replication metrics table
 CREATE INDEX "POLICY_IDX" ON "REPLICATION_METRICS" ("RM_POLICY");
 CREATE INDEX "DUMP_IDX" ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");