You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2021/11/09 08:13:05 UTC
[hive] branch master updated: HIVE-25596: Compress Hive Replication
Metrics while storing (Haymant Mangla, reviewed by Aasha Medhi,
Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6ec140e HIVE-25596: Compress Hive Replication Metrics while storing (Haymant Mangla, reviewed by Aasha Medhi, Pravin Kumar Sinha)
6ec140e is described below
commit 6ec140ed2f735050dd742b0af76fb21ec6b970f5
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Tue Nov 9 13:42:46 2021 +0530
HIVE-25596: Compress Hive Replication Metrics while storing (Haymant Mangla, reviewed by Aasha Medhi, Pravin Kumar Sinha)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +-
.../upgrade/hive/hive-schema-4.0.0.hive.sql | 21 +++-
.../upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql | 21 +++-
.../hadoop/hive/ql/exec/FunctionRegistry.java | 1 +
.../hadoop/hive/ql/exec/repl/ReplStatsTracker.java | 16 ++-
.../hive/ql/parse/repl/metric/MetricSink.java | 27 ++++-
.../repl/metric/ReplicationMetricCollector.java | 27 +++--
.../parse/repl/metric/event/ReplicationMetric.java | 9 ++
.../hive/ql/parse/repl/metric/event/Stage.java | 9 +-
.../hive/ql/udf/generic/GenericUDFDeserialize.java | 92 +++++++++++++++++
.../metric/TestReplicationMetricCollector.java | 43 ++++----
.../repl/metric/TestReplicationMetricSink.java | 43 +++++---
.../TestReplicationMetricUpdateOnFailure.java | 2 +-
.../ql/udf/generic/TestGenericUDFDeserialize.java | 98 ++++++++++++++++++
.../clientpositive/replication_metrics_ingest.q | 4 +-
.../test/queries/clientpositive/udf_deserialize.q | 8 ++
.../llap/replication_metrics_ingest.q.out | 17 +--
.../results/clientpositive/llap/resourceplan.q.out | 4 +
.../clientpositive/llap/show_functions.q.out | 3 +
.../llap/strict_managed_tables_sysdb.q.out | 6 ++
.../test/results/clientpositive/llap/sysdb.q.out | 14 ++-
.../clientpositive/llap/udf_deserialize.q.out | 52 ++++++++++
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 22 ++++
.../src/gen/thrift/gen-cpp/hive_metastore_types.h | 12 ++-
.../hive/metastore/api/ReplicationMetrics.java | 114 ++++++++++++++++++++-
.../gen-php/metastore/ReplicationMetrics.php | 24 +++++
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 ++-
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 4 +
.../src/main/thrift/hive_metastore.thrift | 3 +-
.../apache/hadoop/hive/metastore/ObjectStore.java | 16 ++-
.../metastore/messaging/MessageDeserializer.java | 7 ++
.../hive/metastore/messaging/MessageFactory.java | 26 ++++-
.../metastore/messaging/MessageSerializer.java | 3 +
.../messaging/json/gzip/DeSerializer.java | 4 +
.../metastore/messaging/json/gzip/Serializer.java | 10 +-
.../hive/metastore/model/MReplicationMetrics.java | 10 ++
.../src/main/resources/package.jdo | 5 +-
.../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 3 +-
.../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 14 ++-
.../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 3 +-
.../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 2 +
.../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 3 +-
.../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 4 +
.../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 3 +-
.../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 2 +
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 3 +-
.../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 4 +
.../upgrade-3.1.3000-to-4.0.0.postgres.sql | 4 +
49 files changed, 746 insertions(+), 97 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 65fa491..35f4b93 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -696,7 +696,8 @@ public class HiveConf extends Configuration {
+ "attempted using the snapshot based approach. If disabled, the replication will fail in case the target is "
+ "modified."),
REPL_STATS_TOP_EVENTS_COUNTS("hive.repl.stats.events.count", 5,
- "Number of top costliest events that needs to maintained per event type for the replication statistics."),
+ "Number of topmost expensive events that needs to be maintained per event type for the replication statistics." +
+ " Maximum permissible limit is 10."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
diff --git a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
index c6fe1172..9de3488 100644
--- a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
@@ -1466,7 +1466,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `REPLICATION_METRICS` (
`POLICY_NAME` string,
`DUMP_EXECUTION_ID` bigint,
`METADATA` string,
- `PROGRESS` string
+ `PROGRESS` string,
+ `MESSAGE_FORMAT` varchar(16)
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
@@ -1477,10 +1478,26 @@ TBLPROPERTIES (
\"RM_POLICY\",
\"RM_DUMP_EXECUTION_ID\",
\"RM_METADATA\",
- \"RM_PROGRESS\"
+ \"RM_PROGRESS\",
+ \"MESSAGE_FORMAT\"
FROM \"REPLICATION_METRICS\""
);
+CREATE OR REPLACE VIEW `REPLICATION_METRICS_VIEW` (
+ `SCHEDULED_EXECUTION_ID`,
+ `POLICY_NAME`,
+ `DUMP_EXECUTION_ID`,
+ `METADATA`,
+ `PROGRESS`
+) AS
+SELECT DISTINCT
+ RM.`SCHEDULED_EXECUTION_ID`,
+ RM.`POLICY_NAME`,
+ RM.`DUMP_EXECUTION_ID`,
+ RM.`METADATA`,
+ deserialize(RM.`PROGRESS`, RM.`MESSAGE_FORMAT`)
+FROM SYS.`REPLICATION_METRICS` AS RM;
+
CREATE EXTERNAL TABLE IF NOT EXISTS `NOTIFICATION_LOG` (
`NL_ID` bigint,
`EVENT_ID` bigint,
diff --git a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
index 6ee90d8..6e3a9e1 100644
--- a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
@@ -527,7 +527,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `REPLICATION_METRICS` (
`POLICY_NAME` string,
`DUMP_EXECUTION_ID` bigint,
`METADATA` string,
- `PROGRESS` string
+ `PROGRESS` string,
+ `MESSAGE_FORMAT` varchar(16)
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
@@ -538,10 +539,26 @@ TBLPROPERTIES (
\"RM_POLICY\",
\"RM_DUMP_EXECUTION_ID\",
\"RM_METADATA\",
- \"RM_PROGRESS\"
+ \"RM_PROGRESS\",
+ \"MESSAGE_FORMAT\"
FROM \"REPLICATION_METRICS\""
);
+CREATE OR REPLACE VIEW `REPLICATION_METRICS_VIEW` (
+ `SCHEDULED_EXECUTION_ID`,
+ `POLICY_NAME`,
+ `DUMP_EXECUTION_ID`,
+ `METADATA`,
+ `PROGRESS`
+) AS
+SELECT DISTINCT
+ RM.`SCHEDULED_EXECUTION_ID`,
+ RM.`POLICY_NAME`,
+ RM.`DUMP_EXECUTION_ID`,
+ RM.`METADATA`,
+ deserialize(RM.`PROGRESS`, RM.`MESSAGE_FORMAT`)
+FROM SYS.`REPLICATION_METRICS` AS RM;
+
CREATE EXTERNAL TABLE IF NOT EXISTS `NOTIFICATION_LOG` (
`NL_ID` bigint,
`EVENT_ID` bigint,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 75fc858..1c77496 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -510,6 +510,7 @@ public final class FunctionRegistry {
system.registerGenericUDF("sort_array", GenericUDFSortArray.class);
system.registerGenericUDF("sort_array_by", GenericUDFSortArrayByField.class);
system.registerGenericUDF("array_contains", GenericUDFArrayContains.class);
+ system.registerGenericUDF("deserialize", GenericUDFDeserialize.class);
system.registerGenericUDF("sentences", GenericUDFSentences.class);
system.registerGenericUDF("map_keys", GenericUDFMapKeys.class);
system.registerGenericUDF("map_values", GenericUDFMapValues.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java
index 0d9683b..06c147c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStatsTracker.java
@@ -22,7 +22,6 @@ import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.ObjectName;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.ArrayList;
@@ -30,13 +29,17 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_STATS_TOP_EVENTS_COUNTS;
+
/**
* Tracks the replication statistics per event type.
*/
public class ReplStatsTracker {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplStatsTracker.class);
// Maintains the length of the RM_Progress column in the RDBMS, which stores the ReplStats
- public static int RM_PROGRESS_LENGTH = 24000;
+ public static int RM_PROGRESS_LENGTH = 10000;
+ public static int TOP_K_MAX = 10;
// Maintains the descriptive statistics per event type.
private ConcurrentHashMap<String, DescriptiveStatistics> descMap;
@@ -49,6 +52,11 @@ public class ReplStatsTracker {
private String lastEventId;
public ReplStatsTracker(int k) {
+ if (k > TOP_K_MAX) {
+ LOG.warn("Value for {} exceeded maximum permissible limit. Using Maximum of {}", REPL_STATS_TOP_EVENTS_COUNTS,
+ TOP_K_MAX);
+ k = TOP_K_MAX;
+ }
this.k = k;
descMap = new ConcurrentHashMap<>();
topKEvents = new ConcurrentHashMap<>();
@@ -122,6 +130,10 @@ public class ReplStatsTracker {
return lastEventId;
}
+ public int getK() {
+ return k;
+ }
+
private String formatDouble(DecimalFormat dFormat, Double d) {
if (!d.isNaN()) {
return dFormat.format(d);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
index d0f8b5f..3bbabb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
@@ -22,14 +22,17 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ReplicationMetricList;
import org.apache.hadoop.hive.metastore.api.ReplicationMetrics;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.utils.Retry;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -48,6 +51,7 @@ public final class MetricSink {
private static volatile MetricSink instance;
private boolean isInitialised = false;
private HiveConf conf;
+ private static String RM_PROGRESS_COLUMN_WIDTH_EXCEEDS_MSG = "ERROR: RM_PROGRESS LIMIT EXCEEDED.";
private MetricSink() {
this.executorService = Executors.newSingleThreadScheduledExecutor();
@@ -103,6 +107,21 @@ public final class MetricSink {
this.conf = conf;
}
+ private String updateRMProgressIfLimitExceeds(Progress progress, MessageEncoder encoder) throws SemanticException {
+ try {
+ String progressJson = new ObjectMapper().writeValueAsString(progress);
+ String serializedProgress = encoder.getSerializer().serialize(progressJson);
+ if (serializedProgress.length() > ReplStatsTracker.RM_PROGRESS_LENGTH) {
+ LOG.warn("Error: RM_PROGRESS limit exceeded.\n" +
+ "RM_PROGRESS: " + progressJson + " overwritten by " + RM_PROGRESS_COLUMN_WIDTH_EXCEEDS_MSG);
+ serializedProgress = encoder.getSerializer().serialize(RM_PROGRESS_COLUMN_WIDTH_EXCEEDS_MSG);
+ }
+ return serializedProgress;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
@Override
public void run() {
ReplicationMetricList metricList = new ReplicationMetricList();
@@ -116,14 +135,16 @@ public final class MetricSink {
int totalMetricsSize = metrics.size();
List<ReplicationMetrics> replicationMetricsList = new ArrayList<>(totalMetricsSize);
ObjectMapper mapper = new ObjectMapper();
+ MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(conf);
for (int index = 0; index < totalMetricsSize; index++) {
ReplicationMetric metric = metrics.removeFirst();
ReplicationMetrics persistentMetric = new ReplicationMetrics();
persistentMetric.setDumpExecutionId(metric.getDumpExecutionId());
persistentMetric.setScheduledExecutionId(metric.getScheduledExecutionId());
persistentMetric.setPolicy(metric.getPolicy());
- persistentMetric.setProgress(mapper.writeValueAsString(metric.getProgress()));
+ persistentMetric.setProgress(updateRMProgressIfLimitExceeds(metric.getProgress(), encoder));
persistentMetric.setMetadata(mapper.writeValueAsString(metric.getMetadata()));
+ persistentMetric.setMessageFormat(encoder.getMessageFormat());
LOG.debug("Metric to be persisted {} ", persistentMetric);
replicationMetricsList.add(persistentMetric);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index 88f8e74..27d9f7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.metric;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,6 +50,7 @@ public abstract class ReplicationMetricCollector {
private MetricCollector metricCollector;
private boolean isEnabled;
private static boolean enableForTests;
+ private HiveConf conf;
public void setMetricsMBean(ObjectName metricsMBean) {
this.metricsMBean = metricsMBean;
@@ -58,6 +60,7 @@ public abstract class ReplicationMetricCollector {
public ReplicationMetricCollector(String dbName, Metadata.ReplicationType replicationType,
String stagingDir, long dumpExecutionId, HiveConf conf) {
+ this.conf = conf;
checkEnabledForTests(conf);
String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME);
long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
@@ -65,7 +68,7 @@ public abstract class ReplicationMetricCollector {
isEnabled = true;
metricCollector = MetricCollector.getInstance().init(conf);
MetricSink.getInstance().init(conf);
- Metadata metadata = new Metadata(dbName, replicationType, stagingDir);
+ Metadata metadata = new Metadata(dbName, replicationType, getStagingDir(stagingDir));
replicationMetric = new ReplicationMetric(executionId, policy, dumpExecutionId, metadata);
}
}
@@ -75,7 +78,7 @@ public abstract class ReplicationMetricCollector {
LOG.debug("Stage Started {}, {}, {}", stageName, metricMap.size(), metricMap );
Progress progress = replicationMetric.getProgress();
progress.setStatus(Status.IN_PROGRESS);
- Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis());
+ Stage stage = new Stage(stageName, Status.IN_PROGRESS, getCurrentTimeInMillis());
for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
}
@@ -91,7 +94,7 @@ public abstract class ReplicationMetricCollector {
LOG.info("Failover Stage Started {}, {}, {}", stageName, metricMap.size(), metricMap);
Progress progress = replicationMetric.getProgress();
progress.setStatus(Status.FAILOVER_IN_PROGRESS);
- Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis());
+ Stage stage = new Stage(stageName, Status.IN_PROGRESS, getCurrentTimeInMillis());
for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
}
@@ -116,7 +119,7 @@ public abstract class ReplicationMetricCollector {
stage = new Stage(stageName, status, -1L);
}
stage.setStatus(status);
- stage.setEndTime(System.currentTimeMillis());
+ stage.setEndTime(getCurrentTimeInMillis());
stage.setReplSnapshotsCount(replSnapshotCount);
if (replStatsTracker != null && !(replStatsTracker instanceof NoOpReplStatsTracker)) {
String replStatString = replStatsTracker.toString();
@@ -145,7 +148,7 @@ public abstract class ReplicationMetricCollector {
stage = new Stage(stageName, status, -1L);
}
stage.setStatus(status);
- stage.setEndTime(System.currentTimeMillis());
+ stage.setEndTime(getCurrentTimeInMillis());
stage.setErrorLogPath(errorLogPath);
progress.addStage(stage);
replicationMetric.setProgress(progress);
@@ -166,7 +169,7 @@ public abstract class ReplicationMetricCollector {
stage = new Stage(stageName, status, -1L);
}
stage.setStatus(status);
- stage.setEndTime(System.currentTimeMillis());
+ stage.setEndTime(getCurrentTimeInMillis());
progress.addStage(stage);
replicationMetric.setProgress(progress);
metricCollector.addMetric(replicationMetric);
@@ -224,4 +227,16 @@ public abstract class ReplicationMetricCollector {
}
}
}
+
+ private boolean testingModeEnabled() {
+ return conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL);
+ }
+
+ private long getCurrentTimeInMillis() {
+ return testingModeEnabled() ? 0L : System.currentTimeMillis();
+ }
+
+ private String getStagingDir(String stagingDir) {
+ return testingModeEnabled() ? "dummyDir" : stagingDir;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java
index 230b516..ca6bddf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java
@@ -26,6 +26,7 @@ public class ReplicationMetric {
private long dumpExecutionId;
private Metadata metadata;
private Progress progress;
+ private String messageFormat;
public ReplicationMetric(long scheduledExecutionId, String policy, long dumpExecutionId, Metadata metadata){
this.scheduledExecutionId = scheduledExecutionId;
@@ -71,4 +72,12 @@ public class ReplicationMetric {
public void setProgress(Progress progress) {
this.progress = progress;
}
+
+ public String getMessageFormat() {
+ return messageFormat;
+ }
+
+ public void setMessageFormat(String messageFormat) {
+ this.messageFormat = messageFormat;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
index 4a54a8b..83df9f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
@@ -24,8 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker.RM_PROGRESS_LENGTH;
-
/**
* Class for defining the different stages of replication.
*/
@@ -129,12 +127,7 @@ public class Stage {
}
public void setReplStats(String replStats) {
- // Check the stat string doesn't surpass the RM_PROGRESS column length.
- if (replStats.length() >= RM_PROGRESS_LENGTH - 2000) {
- this.replStats = "RM_PROGRESS LIMIT EXCEEDED TO " + replStats.length();
- } else {
- this.replStats = replStats;
- }
+ this.replStats = replStats;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java
new file mode 100644
index 0000000..9fa375a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+/**
+ * GenericUDFDeserializeString.
+ *
+ */
+@Description(name = "deserialize",
+ value="_FUNC_(base64 encoded message, compressionFormat) - Returns plain text string of given message which " +
+ "was compressed in compressionFormat and base64 encoded.",
+ extended="Currently, Supports only 'gzip' for Gzip compressed and base 64 encoded strings.\n" +
+ "Example:\n"
+ + " > SELECT _FUNC_('H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA', 'gzip') FROM src LIMIT 1;\n"
+ + " test")
+public class GenericUDFDeserialize extends GenericUDF {
+
+ private static final int ARG_COUNT = 2; // Number of arguments to this UDF
+ private static final String FUNC_NAME = "deserialize"; // External Name
+
+ private transient PrimitiveObjectInspector stringOI = null;
+ private transient PrimitiveObjectInspector compressionFormat = null;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments)
+ throws UDFArgumentException {
+ if (arguments.length != ARG_COUNT) {
+ throw new UDFArgumentException("The function " + FUNC_NAME + " accepts " + ARG_COUNT + " arguments.");
+ }
+ for (ObjectInspector arg: arguments) {
+ if (arg.getCategory() != ObjectInspector.Category.PRIMITIVE ||
+ PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP != PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
+ ((PrimitiveObjectInspector)arg).getPrimitiveCategory())){
+ throw new UDFArgumentTypeException(0, "The arguments to " + FUNC_NAME + " must be a string/varchar");
+ }
+ }
+ stringOI = (PrimitiveObjectInspector) arguments[0];
+ compressionFormat = (PrimitiveObjectInspector) arguments[1];
+ return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+ String value = PrimitiveObjectInspectorUtils.getString(arguments[0].get(), stringOI);
+ String compressionFormat = PrimitiveObjectInspectorUtils.getString(arguments[1].get(), this.compressionFormat);
+ if (value == null || StringUtils.isEmpty(compressionFormat)) {
+ return value;
+ }
+ MessageEncoder encoder;
+ try {
+ encoder = MessageFactory.getInstance(compressionFormat);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ return encoder.getDeserializer().deSerializeGenericString(value);
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ assert (children.length == ARG_COUNT);
+ return getStandardDisplayString(FUNC_NAME, children, ",");
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
index a9784a0..45569bc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.parse.repl.metric;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
import org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
@@ -72,7 +74,7 @@ public class TestReplicationMetricCollector {
conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "1");
MetricCollector.getInstance().init(conf);
Mockito.when(fmd.getFailoverEventId()).thenReturn(10L);
- Mockito.when(fmd.getFilePath()).thenReturn("staging");
+ Mockito.when(fmd.getFilePath()).thenReturn("dummyDir");
}
@After
@@ -105,7 +107,7 @@ public class TestReplicationMetricCollector {
conf = new HiveConf();
MetricCollector.getInstance().init(conf);
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "staging", conf);
+ "dummyDir", conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -120,7 +122,7 @@ public class TestReplicationMetricCollector {
conf = new HiveConf();
MetricCollector.getInstance().init(conf);
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "staging", conf);
+ "dummyDir", conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -132,7 +134,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessBootstrapDumpMetrics() throws Exception {
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "staging", conf);
+ "dummyDir", conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -152,7 +154,7 @@ public class TestReplicationMetricCollector {
actualMetrics = MetricCollector.getInstance().getMetrics();
Assert.assertEquals(1, actualMetrics.size());
- Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging");
+ Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "dummyDir");
expectedMetadata.setLastReplId(10);
Progress expectedProgress = new Progress();
expectedProgress.setStatus(Status.SUCCESS);
@@ -174,7 +176,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessIncrDumpMetrics() throws Exception {
ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
- "staging", conf);
+ "dummyDir", conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -194,7 +196,7 @@ public class TestReplicationMetricCollector {
actualMetrics = MetricCollector.getInstance().getMetrics();
Assert.assertEquals(1, actualMetrics.size());
- Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging");
+ Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "dummyDir");
expectedMetadata.setLastReplId(10);
Progress expectedProgress = new Progress();
expectedProgress.setStatus(Status.SUCCESS);
@@ -217,7 +219,7 @@ public class TestReplicationMetricCollector {
@Test
public void testFailoverReadyDumpMetrics() throws Exception {
ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
- "staging", conf);
+ "dummyDir", conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
incrDumpMetricCollector.reportFailoverStart("dump", metricMap, fmd);
@@ -231,10 +233,10 @@ public class TestReplicationMetricCollector {
actualMetrics = MetricCollector.getInstance().getMetrics();
Assert.assertEquals(1, actualMetrics.size());
- Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging");
+ Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "dummyDir");
expectedMetadata.setLastReplId(10);
expectedMetadata.setFailoverEventId(10);
- expectedMetadata.setFailoverMetadataLoc("staging");
+ expectedMetadata.setFailoverMetadataLoc("dummyDir");
Progress expectedProgress = new Progress();
expectedProgress.setStatus(Status.FAILOVER_READY);
Stage dumpStage = new Stage("dump", Status.SUCCESS, 0);
@@ -253,7 +255,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessBootstrapLoadMetrics() throws Exception {
ReplicationMetricCollector bootstrapLoadMetricCollector = new BootstrapLoadMetricCollector("db",
- "staging", 1, conf);
+ "dummyDir", 1, conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -274,7 +276,7 @@ public class TestReplicationMetricCollector {
actualMetrics = MetricCollector.getInstance().getMetrics();
Assert.assertEquals(1, actualMetrics.size());
- Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging");
+ Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "dummyDir");
expectedMetadata.setLastReplId(10);
Progress expectedProgress = new Progress();
expectedProgress.setStatus(Status.SUCCESS);
@@ -297,7 +299,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessIncrLoadMetrics() throws Exception {
ReplicationMetricCollector incrLoadMetricCollector = new IncrementalLoadMetricCollector("db",
- "staging", 1, conf);
+ "dummyDir", 1, conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -317,7 +319,7 @@ public class TestReplicationMetricCollector {
actualMetrics = MetricCollector.getInstance().getMetrics();
Assert.assertEquals(1, actualMetrics.size());
- Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging");
+ Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "dummyDir");
expectedMetadata.setLastReplId(10);
Progress expectedProgress = new Progress();
expectedProgress.setStatus(Status.SUCCESS);
@@ -361,7 +363,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessStageFailure() throws Exception {
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "staging", conf);
+ "dummyDir", conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -376,7 +378,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessStageFailedAdmin() throws Exception {
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "staging", conf);
+ "dummyDir", conf);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -462,14 +464,19 @@ public class TestReplicationMetricCollector {
@Test
public void testReplStatsTrackerLimit() {
+ MessageSerializer serializer = MessageFactory.getDefaultInstanceForReplMetrics(conf).getSerializer();
ReplStatsTracker repl = new ReplStatsTracker(10);
// Check for k=10
generateStatsString(10, repl);
- assertTrue("ReplStat string is " + repl.toString().length(), repl.toString().length() < 24000);
+ String replStatsTracker = repl.toString();
+ String gzipSerialized = serializer.serialize(replStatsTracker);
+ assertTrue("ReplStat string is " + gzipSerialized.length(), gzipSerialized.length() < ReplStatsTracker.RM_PROGRESS_LENGTH);
// Check for k=5
repl = new ReplStatsTracker(5);
generateStatsString(5, repl);
- assertTrue("ReplStat string is " + repl.toString().length(), repl.toString().length() < 24000);
+ replStatsTracker = repl.toString();
+ gzipSerialized = serializer.serialize(replStatsTracker);
+ assertTrue("ReplStat string is " + gzipSerialized.length(), gzipSerialized.length() < ReplStatsTracker.RM_PROGRESS_LENGTH);
// Check for k=2 & check NaN values doesn't get messed up due to formatter
repl = new ReplStatsTracker(2);
generateStatsString(2, repl);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
index dc7459d..8c359b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.GetReplicationMetricsRequest;
import org.apache.hadoop.hive.metastore.api.ReplicationMetricList;
import org.apache.hadoop.hive.metastore.api.ReplicationMetrics;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
@@ -56,7 +58,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.hive.ql.exec.repl.ReplStatsTracker.RM_PROGRESS_LENGTH;
import static org.junit.Assert.assertTrue;
/**
@@ -65,6 +66,8 @@ import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
public class TestReplicationMetricSink {
+ //Deserializer to decode and decompress the input string.
+ MessageDeserializer deserializer;
HiveConf conf;
@Mock
@@ -78,6 +81,11 @@ public class TestReplicationMetricSink {
MetricSink metricSinkSpy = Mockito.spy(MetricSink.getInstance());
Mockito.doReturn(1L).when(metricSinkSpy).getFrequencyInSecs();
metricSinkSpy.init(conf);
+ deserializer = MessageFactory.getDefaultInstanceForReplMetrics(conf).getDeserializer();
+ }
+
+ private String deSerialize(String msg) {
+ return deserializer.deSerializeGenericString(msg);
}
@Test
@@ -99,9 +107,7 @@ public class TestReplicationMetricSink {
bootstrapDumpMetricCollector.reportEnd(Status.SUCCESS);
Metadata expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
- Metadata.ReplicationType.BOOTSTRAP, "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_"
- + "parse_TestReplicationScenarios_245261428230295/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGlu"
- + "Y3JfMTU5MjIwNTg3NTM4Nw==/0/hive");
+ Metadata.ReplicationType.BOOTSTRAP, "dummyDir");
expectedMetadata.setLastReplId(10);
Progress expectedProgress = new Progress();
expectedProgress.setStatus(Status.SUCCESS);
@@ -126,7 +132,8 @@ public class TestReplicationMetricSink {
ReplicationMetric actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
- ProgressMapper progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+ actualMetric.setMessageFormat(actualThriftMetric.getMessageFormat());
+ ProgressMapper progressMapper = mapper.readValue(deSerialize(actualThriftMetric.getProgress()), ProgressMapper.class);
Progress progress = new Progress();
progress.setStatus(progressMapper.getStatus());
for (StageMapper stageMapper : progressMapper.getStages()) {
@@ -159,9 +166,7 @@ public class TestReplicationMetricSink {
incrementDumpMetricCollector.reportEnd(Status.SUCCESS);
expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
- Metadata.ReplicationType.INCREMENTAL, "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_"
- + "parse_TestReplicationScenarios_245261428230295/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGlu"
- + "Y3JfMTU5MjIwNTg3NTM4Nw==/0/hive");
+ Metadata.ReplicationType.INCREMENTAL, "dummyDir");
expectedMetadata.setLastReplId(10);
expectedProgress = new Progress();
expectedProgress.setStatus(Status.SUCCESS);
@@ -184,7 +189,8 @@ public class TestReplicationMetricSink {
actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
- progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+ actualMetric.setMessageFormat(actualThriftMetric.getMessageFormat());
+ progressMapper = mapper.readValue(deSerialize(actualThriftMetric.getProgress()), ProgressMapper.class);
progress = new Progress();
progress.setStatus(progressMapper.getStatus());
for (StageMapper stageMapper : progressMapper.getStages()) {
@@ -220,7 +226,7 @@ public class TestReplicationMetricSink {
failoverDumpMetricCollector.reportEnd(Status.FAILOVER_READY);
expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
- Metadata.ReplicationType.INCREMENTAL, stagingDir);
+ Metadata.ReplicationType.INCREMENTAL, "dummyDir");
expectedMetadata.setLastReplId(10);
expectedMetadata.setFailoverEventId(100);
expectedMetadata.setFailoverMetadataLoc(stagingDir + FailoverMetaData.FAILOVER_METADATA);
@@ -245,7 +251,8 @@ public class TestReplicationMetricSink {
actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
- progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+ actualMetric.setMessageFormat(actualThriftMetric.getMessageFormat());
+ progressMapper = mapper.readValue(deSerialize(actualThriftMetric.getProgress()), ProgressMapper.class);
progress = new Progress();
progress.setStatus(progressMapper.getStatus());
for (StageMapper stageMapper : progressMapper.getStages()) {
@@ -308,6 +315,8 @@ public class TestReplicationMetricSink {
@Test
public void testReplStatsInMetrics() throws HiveException, InterruptedException, TException {
+ int origRMProgress = ReplStatsTracker.RM_PROGRESS_LENGTH;
+ ReplStatsTracker.RM_PROGRESS_LENGTH = 10;
ReplicationMetricCollector incrementDumpMetricCollector =
new IncrementalDumpMetricCollector("testAcidTablesReplLoadBootstrapIncr_1592205875387",
"hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
@@ -315,7 +324,7 @@ public class TestReplicationMetricSink {
Map<String, Long> metricMap = new HashMap<>();
ReplStatsTracker repl = Mockito.mock(ReplStatsTracker.class);
- Mockito.when(repl.toString()).thenReturn(RandomStringUtils.randomAlphabetic(RM_PROGRESS_LENGTH));
+ Mockito.when(repl.toString()).thenReturn(RandomStringUtils.randomAlphabetic(1000));
metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
incrementDumpMetricCollector.reportStageStart("dump", metricMap);
incrementDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.EVENTS.name(), 10);
@@ -325,8 +334,12 @@ public class TestReplicationMetricSink {
GetReplicationMetricsRequest metricsRequest = new GetReplicationMetricsRequest();
metricsRequest.setPolicy("repl");
ReplicationMetricList actualReplicationMetrics = Hive.get(conf).getMSC().getReplicationMetrics(metricsRequest);
- assertTrue(actualReplicationMetrics.getReplicationMetricList().get(0).getProgress(),
- actualReplicationMetrics.getReplicationMetricList().get(0).getProgress()
- .contains("RM_PROGRESS LIMIT EXCEEDED"));
+ String progress = deSerialize(actualReplicationMetrics.getReplicationMetricList().get(0).getProgress());
+ assertTrue(progress, progress.contains("ERROR: RM_PROGRESS LIMIT EXCEEDED."));
+ ReplStatsTracker.RM_PROGRESS_LENGTH = origRMProgress;
+
+ //Testing K_MAX
+ repl = new ReplStatsTracker(15);
+ Assert.assertEquals(ReplStatsTracker.TOP_K_MAX, repl.getK());
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
index c2e544f..72d98c2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
@@ -70,7 +70,7 @@ public class TestReplicationMetricUpdateOnFailure {
public void setup() throws Exception {
conf = new HiveConf();
- conf.set(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+ conf.set(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
conf.set(Constants.SCHEDULED_QUERY_SCHEDULENAME, "repl");
conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "1");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java
new file mode 100644
index 0000000..a656db4
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+/**
+ * TestGenericUDFDeserialize:
+ * Utility to test the behaviour of GenericUDFDeserialize.
+ */
+public class TestGenericUDFDeserialize {
+
+ @Test
+ public void testOneArg() throws HiveException {
+ GenericUDFDeserialize udf = new GenericUDFDeserialize();
+ ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+ ObjectInspector valueOI2 = PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+ UDFArgumentException ex = null;
+ try {
+ udf.initialize(new ObjectInspector[]{valueOI1});
+ } catch (UDFArgumentException e) {
+ ex = e;
+ }
+ assertTrue(ex.getMessage().contains("The function deserialize accepts 2 arguments."));
+ assertNotNull("The function deserialize() accepts 2 argument.", ex);
+ ex = null;
+ try {
+ udf.initialize(new ObjectInspector[]{valueOI2, valueOI1});
+ } catch (UDFArgumentException e) {
+ ex = e;
+ }
+ assertNull("The function deserialize() accepts 2 argument.", ex);
+ }
+
+ @Test
+ public void testGZIPBase64Compression() throws HiveException {
+ GenericUDFDeserialize udf = new GenericUDFDeserialize();
+ udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector,
+ PrimitiveObjectInspectorFactory.writableStringObjectInspector});
+ GenericUDF.DeferredObject[] args = new GenericUDF.DeferredObject[2];
+ String expectedOutput = "test";
+ MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(new HiveConf());
+ String serializedMsg = encoder.getSerializer().serialize(expectedOutput);
+ args[0] = new GenericUDF.DeferredJavaObject(new Text(serializedMsg));
+ args[1] = new GenericUDF.DeferredJavaObject(new Text(encoder.getMessageFormat()));
+ Object actualOutput = udf.evaluate(args).toString();
+ assertEquals("deserialize() test", expectedOutput, actualOutput != null ? actualOutput : null);
+ }
+
+ @Test
+ public void testInvalidCompressionFormat() throws HiveException {
+ GenericUDFDeserialize udf = new GenericUDFDeserialize();
+ udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector,
+ PrimitiveObjectInspectorFactory.writableStringObjectInspector});
+ GenericUDF.DeferredObject[] args = new GenericUDF.DeferredObject[2];
+ String expectedOutput = "test";
+ MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(new HiveConf());
+ String serializedMsg = encoder.getSerializer().serialize(expectedOutput);
+ String compressionFormat = "randomSerialization";
+ args[0] = new GenericUDF.DeferredJavaObject(new Text(serializedMsg));
+ args[1] = new GenericUDF.DeferredJavaObject(new Text(compressionFormat));
+ HiveException ex = null;
+ try {
+ udf.evaluate(args).toString();
+ } catch (HiveException e) {
+ ex = e;
+ }
+ assertNotNull("Invalid message format provided.", ex);
+ assertTrue(ex.getMessage().contains("compressionFormat: " + compressionFormat + " is not supported."));
+ }
+}
diff --git a/ql/src/test/queries/clientpositive/replication_metrics_ingest.q b/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
index 6a8f787..a710b00 100644
--- a/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
+++ b/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
@@ -41,6 +41,6 @@ alter scheduled query repl2 disabled;
show databases;
-select policy_name, dump_execution_id from sys.replication_metrics;
+select * from sys.replication_metrics;
-select count(*) from sys.replication_metrics where scheduled_execution_id > 0;
+select * from sys.replication_metrics_view order by dump_execution_id;
diff --git a/ql/src/test/queries/clientpositive/udf_deserialize.q b/ql/src/test/queries/clientpositive/udf_deserialize.q
new file mode 100644
index 0000000..b1ae43b
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/udf_deserialize.q
@@ -0,0 +1,8 @@
+
+DESCRIBE FUNCTION deserialize;
+DESCRIBE FUNCTION EXTENDED deserialize;
+
+SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip");
+SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip(json-2.0)");
+SELECT deserialize("test", "");
+SELECT deserialize("{unitTest:'udf-deserialize'}", "json-0.2");
diff --git a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index a3e8e0c..6d62725 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -72,22 +72,25 @@ destination
information_schema
src
sys
-PREHOOK: query: select policy_name, dump_execution_id from sys.replication_metrics
+PREHOOK: query: select * from sys.replication_metrics
PREHOOK: type: QUERY
PREHOOK: Input: sys@replication_metrics
#### A masked pattern was here ####
-POSTHOOK: query: select policy_name, dump_execution_id from sys.replication_metrics
+POSTHOOK: query: select * from sys.replication_metrics
POSTHOOK: type: QUERY
POSTHOOK: Input: sys@replication_metrics
#### A masked pattern was here ####
-repl1 0
-repl2 1
-PREHOOK: query: select count(*) from sys.replication_metrics where scheduled_execution_id > 0
+1 repl1 0 {"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0} H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA gzip(json-2.0)
+2 repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0} H4sIAAAAAAAAAG2PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA= gzip(json-2.0)
+PREHOOK: query: select * from sys.replication_metrics_view order by dump_execution_id
PREHOOK: type: QUERY
PREHOOK: Input: sys@replication_metrics
+PREHOOK: Input: sys@replication_metrics_view
#### A masked pattern was here ####
-POSTHOOK: query: select count(*) from sys.replication_metrics where scheduled_execution_id > 0
+POSTHOOK: query: select * from sys.replication_metrics_view order by dump_execution_id
POSTHOOK: type: QUERY
POSTHOOK: Input: sys@replication_metrics
+POSTHOOK: Input: sys@replication_metrics_view
#### A masked pattern was here ####
-2
+1 repl1 0 {"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0} {"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":null,"replStats":null}]}
+2 repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0} {"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":{"numCreated":0,"numDeleted":0},"replStats":null}]}
diff --git a/ql/src/test/results/clientpositive/llap/resourceplan.q.out b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
index 1a83c09..2b190da 100644
--- a/ql/src/test/results/clientpositive/llap/resourceplan.q.out
+++ b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
@@ -210,6 +210,10 @@ sys replication_metrics hive_test_user USER DELETE true -1 hive_test_user
sys replication_metrics hive_test_user USER INSERT true -1 hive_test_user
sys replication_metrics hive_test_user USER SELECT true -1 hive_test_user
sys replication_metrics hive_test_user USER UPDATE true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER DELETE true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER INSERT true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER SELECT true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER UPDATE true -1 hive_test_user
sys role_map hive_test_user USER DELETE true -1 hive_test_user
sys role_map hive_test_user USER INSERT true -1 hive_test_user
sys role_map hive_test_user USER SELECT true -1 hive_test_user
diff --git a/ql/src/test/results/clientpositive/llap/show_functions.q.out b/ql/src/test/results/clientpositive/llap/show_functions.q.out
index c05d208..a953471 100644
--- a/ql/src/test/results/clientpositive/llap/show_functions.q.out
+++ b/ql/src/test/results/clientpositive/llap/show_functions.q.out
@@ -106,6 +106,7 @@ dayofweek
decode
degrees
dense_rank
+deserialize
div
ds_cpc_estimate
ds_cpc_estimate_bounds
@@ -464,6 +465,7 @@ coalesce
current_database
current_date
decode
+deserialize
ds_cpc_estimate
ds_hll_estimate
ds_kll_quantile
@@ -623,6 +625,7 @@ dayofweek
decode
degrees
dense_rank
+deserialize
div
ds_cpc_estimate
ds_cpc_estimate_bounds
diff --git a/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out b/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out
index 32eb6e2..5eb6e90 100644
--- a/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/strict_managed_tables_sysdb.q.out
@@ -300,6 +300,10 @@ sys replication_metrics hive_test_user USER DELETE true -1 hive_test_user
sys replication_metrics hive_test_user USER INSERT true -1 hive_test_user
sys replication_metrics hive_test_user USER SELECT true -1 hive_test_user
sys replication_metrics hive_test_user USER UPDATE true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER DELETE true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER INSERT true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER SELECT true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER UPDATE true -1 hive_test_user
sys role_map hive_test_user USER DELETE true -1 hive_test_user
sys role_map hive_test_user USER INSERT true -1 hive_test_user
sys role_map hive_test_user USER SELECT true -1 hive_test_user
@@ -490,6 +494,7 @@ PREHOOK: Output: sys@partition_params
PREHOOK: Output: sys@partition_stats_view
PREHOOK: Output: sys@partitions
PREHOOK: Output: sys@replication_metrics
+PREHOOK: Output: sys@replication_metrics_view
PREHOOK: Output: sys@role_map
PREHOOK: Output: sys@roles
PREHOOK: Output: sys@scheduled_executions
@@ -551,6 +556,7 @@ POSTHOOK: Output: sys@partition_params
POSTHOOK: Output: sys@partition_stats_view
POSTHOOK: Output: sys@partitions
POSTHOOK: Output: sys@replication_metrics
+POSTHOOK: Output: sys@replication_metrics_view
POSTHOOK: Output: sys@role_map
POSTHOOK: Output: sys@roles
POSTHOOK: Output: sys@scheduled_executions
diff --git a/ql/src/test/results/clientpositive/llap/sysdb.q.out b/ql/src/test/results/clientpositive/llap/sysdb.q.out
index 1b0e267..da1cf94 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb.q.out
@@ -248,6 +248,10 @@ sys replication_metrics hive_test_user USER DELETE true -1 hive_test_user
sys replication_metrics hive_test_user USER INSERT true -1 hive_test_user
sys replication_metrics hive_test_user USER SELECT true -1 hive_test_user
sys replication_metrics hive_test_user USER UPDATE true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER DELETE true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER INSERT true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER SELECT true -1 hive_test_user
+sys replication_metrics_view hive_test_user USER UPDATE true -1 hive_test_user
sys role_map hive_test_user USER DELETE true -1 hive_test_user
sys role_map hive_test_user USER INSERT true -1 hive_test_user
sys role_map hive_test_user USER SELECT true -1 hive_test_user
@@ -712,10 +716,16 @@ partitions part_name
partitions sd_id
partitions tbl_id
replication_metrics dump_execution_id
+replication_metrics message_format
replication_metrics metadata
replication_metrics policy_name
replication_metrics progress
replication_metrics scheduled_execution_id
+replication_metrics_view dump_execution_id
+replication_metrics_view metadata
+replication_metrics_view policy_name
+replication_metrics_view progress
+replication_metrics_view scheduled_execution_id
role_map add_time
role_map grant_option
role_map grantor
@@ -1215,7 +1225,7 @@ POSTHOOK: query: select count(*) from sds
POSTHOOK: type: QUERY
POSTHOOK: Input: sys@sds
#### A masked pattern was here ####
-74
+75
PREHOOK: query: select param_key, param_value from sd_params order by param_key, param_value limit 5
PREHOOK: type: QUERY
PREHOOK: Input: sys@sd_params
@@ -1623,6 +1633,7 @@ default sys partition_params BASE_TABLE NULL NULL NULL NULL NULL YES NO NULL
default sys partition_stats_view VIEW NULL NULL NULL NULL NULL NO NO NULL
default sys partitions BASE_TABLE NULL NULL NULL NULL NULL YES NO NULL
default sys replication_metrics BASE_TABLE NULL NULL NULL NULL NULL YES NO NULL
+default sys replication_metrics_view VIEW NULL NULL NULL NULL NULL NO NO NULL
default sys role_map BASE_TABLE NULL NULL NULL NULL NULL YES NO NULL
default sys roles BASE_TABLE NULL NULL NULL NULL NULL YES NO NULL
default sys scheduled_executions BASE_TABLE NULL NULL NULL NULL NULL YES NO NULL
@@ -1746,6 +1757,7 @@ information_schema views
sys compactions
sys locks
sys partition_stats_view
+sys replication_metrics_view
sys table_stats_view
sys transactions
sys version
diff --git a/ql/src/test/results/clientpositive/llap/udf_deserialize.q.out b/ql/src/test/results/clientpositive/llap/udf_deserialize.q.out
new file mode 100644
index 0000000..b8bc16a
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/udf_deserialize.q.out
@@ -0,0 +1,52 @@
+PREHOOK: query: DESCRIBE FUNCTION deserialize
+PREHOOK: type: DESCFUNCTION
+POSTHOOK: query: DESCRIBE FUNCTION deserialize
+POSTHOOK: type: DESCFUNCTION
+deserialize(base64 encoded message, compressionFormat) - Returns plain text string of given message which was compressed in compressionFormat and base64 encoded.
+PREHOOK: query: DESCRIBE FUNCTION EXTENDED deserialize
+PREHOOK: type: DESCFUNCTION
+POSTHOOK: query: DESCRIBE FUNCTION EXTENDED deserialize
+POSTHOOK: type: DESCFUNCTION
+deserialize(base64 encoded message, compressionFormat) - Returns plain text string of given message which was compressed in compressionFormat and base64 encoded.
+Currently, Supports only 'gzip' for Gzip compressed and base 64 encoded strings.
+Example:
+ > SELECT deserialize('H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA', 'gzip') FROM src LIMIT 1;
+ test
+Function class:org.apache.hadoop.hive.ql.udf.generic.GenericUDFDeserialize
+Function type:BUILTIN
+PREHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+test
+PREHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip(json-2.0)")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA", "gzip(json-2.0)")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+test
+PREHOOK: query: SELECT deserialize("test", "")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("test", "")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+test
+PREHOOK: query: SELECT deserialize("{unitTest:'udf-deserialize'}", "json-0.2")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT deserialize("{unitTest:'udf-deserialize'}", "json-0.2")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+{unitTest:'udf-deserialize'}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 2ff5912..a2b2652 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -46769,6 +46769,11 @@ void ReplicationMetrics::__set_progress(const std::string& val) {
this->progress = val;
__isset.progress = true;
}
+
+void ReplicationMetrics::__set_messageFormat(const std::string& val) {
+ this->messageFormat = val;
+__isset.messageFormat = true;
+}
std::ostream& operator<<(std::ostream& out, const ReplicationMetrics& obj)
{
obj.printTo(out);
@@ -46840,6 +46845,14 @@ uint32_t ReplicationMetrics::read(::apache::thrift::protocol::TProtocol* iprot)
xfer += iprot->skip(ftype);
}
break;
+ case 6:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->messageFormat);
+ this->__isset.messageFormat = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -46885,6 +46898,11 @@ uint32_t ReplicationMetrics::write(::apache::thrift::protocol::TProtocol* oprot)
xfer += oprot->writeString(this->progress);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.messageFormat) {
+ xfer += oprot->writeFieldBegin("messageFormat", ::apache::thrift::protocol::T_STRING, 6);
+ xfer += oprot->writeString(this->messageFormat);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -46897,6 +46915,7 @@ void swap(ReplicationMetrics &a, ReplicationMetrics &b) {
swap(a.dumpExecutionId, b.dumpExecutionId);
swap(a.metadata, b.metadata);
swap(a.progress, b.progress);
+ swap(a.messageFormat, b.messageFormat);
swap(a.__isset, b.__isset);
}
@@ -46906,6 +46925,7 @@ ReplicationMetrics::ReplicationMetrics(const ReplicationMetrics& other1665) {
dumpExecutionId = other1665.dumpExecutionId;
metadata = other1665.metadata;
progress = other1665.progress;
+ messageFormat = other1665.messageFormat;
__isset = other1665.__isset;
}
ReplicationMetrics& ReplicationMetrics::operator=(const ReplicationMetrics& other1666) {
@@ -46914,6 +46934,7 @@ ReplicationMetrics& ReplicationMetrics::operator=(const ReplicationMetrics& othe
dumpExecutionId = other1666.dumpExecutionId;
metadata = other1666.metadata;
progress = other1666.progress;
+ messageFormat = other1666.messageFormat;
__isset = other1666.__isset;
return *this;
}
@@ -46925,6 +46946,7 @@ void ReplicationMetrics::printTo(std::ostream& out) const {
out << ", " << "dumpExecutionId=" << to_string(dumpExecutionId);
out << ", " << "metadata="; (__isset.metadata ? (out << to_string(metadata)) : (out << "<null>"));
out << ", " << "progress="; (__isset.progress ? (out << to_string(progress)) : (out << "<null>"));
+ out << ", " << "messageFormat="; (__isset.messageFormat ? (out << to_string(messageFormat)) : (out << "<null>"));
out << ")";
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 025e733..19a318e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -17640,9 +17640,10 @@ void swap(GetPartitionsPsWithAuthResponse &a, GetPartitionsPsWithAuthResponse &b
std::ostream& operator<<(std::ostream& out, const GetPartitionsPsWithAuthResponse& obj);
typedef struct _ReplicationMetrics__isset {
- _ReplicationMetrics__isset() : metadata(false), progress(false) {}
+ _ReplicationMetrics__isset() : metadata(false), progress(false), messageFormat(false) {}
bool metadata :1;
bool progress :1;
+ bool messageFormat :1;
} _ReplicationMetrics__isset;
class ReplicationMetrics : public virtual ::apache::thrift::TBase {
@@ -17650,7 +17651,7 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
ReplicationMetrics(const ReplicationMetrics&);
ReplicationMetrics& operator=(const ReplicationMetrics&);
- ReplicationMetrics() : scheduledExecutionId(0), policy(), dumpExecutionId(0), metadata(), progress() {
+ ReplicationMetrics() : scheduledExecutionId(0), policy(), dumpExecutionId(0), metadata(), progress(), messageFormat() {
}
virtual ~ReplicationMetrics() noexcept;
@@ -17659,6 +17660,7 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
int64_t dumpExecutionId;
std::string metadata;
std::string progress;
+ std::string messageFormat;
_ReplicationMetrics__isset __isset;
@@ -17672,6 +17674,8 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
void __set_progress(const std::string& val);
+ void __set_messageFormat(const std::string& val);
+
bool operator == (const ReplicationMetrics & rhs) const
{
if (!(scheduledExecutionId == rhs.scheduledExecutionId))
@@ -17688,6 +17692,10 @@ class ReplicationMetrics : public virtual ::apache::thrift::TBase {
return false;
else if (__isset.progress && !(progress == rhs.progress))
return false;
+ if (__isset.messageFormat != rhs.__isset.messageFormat)
+ return false;
+ else if (__isset.messageFormat && !(messageFormat == rhs.messageFormat))
+ return false;
return true;
}
bool operator != (const ReplicationMetrics &rhs) const {
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java
index 50eda8d..8962938 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ReplicationMetrics.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.metastore.api;
private static final org.apache.thrift.protocol.TField DUMP_EXECUTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("dumpExecutionId", org.apache.thrift.protocol.TType.I64, (short)3);
private static final org.apache.thrift.protocol.TField METADATA_FIELD_DESC = new org.apache.thrift.protocol.TField("metadata", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField PROGRESS_FIELD_DESC = new org.apache.thrift.protocol.TField("progress", org.apache.thrift.protocol.TType.STRING, (short)5);
+ private static final org.apache.thrift.protocol.TField MESSAGE_FORMAT_FIELD_DESC = new org.apache.thrift.protocol.TField("messageFormat", org.apache.thrift.protocol.TType.STRING, (short)6);
private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ReplicationMetricsStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ReplicationMetricsTupleSchemeFactory();
@@ -25,6 +26,7 @@ package org.apache.hadoop.hive.metastore.api;
private long dumpExecutionId; // required
private @org.apache.thrift.annotation.Nullable java.lang.String metadata; // optional
private @org.apache.thrift.annotation.Nullable java.lang.String progress; // optional
+ private @org.apache.thrift.annotation.Nullable java.lang.String messageFormat; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -32,7 +34,8 @@ package org.apache.hadoop.hive.metastore.api;
POLICY((short)2, "policy"),
DUMP_EXECUTION_ID((short)3, "dumpExecutionId"),
METADATA((short)4, "metadata"),
- PROGRESS((short)5, "progress");
+ PROGRESS((short)5, "progress"),
+ MESSAGE_FORMAT((short)6, "messageFormat");
private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
@@ -58,6 +61,8 @@ package org.apache.hadoop.hive.metastore.api;
return METADATA;
case 5: // PROGRESS
return PROGRESS;
+ case 6: // MESSAGE_FORMAT
+ return MESSAGE_FORMAT;
default:
return null;
}
@@ -102,7 +107,7 @@ package org.apache.hadoop.hive.metastore.api;
private static final int __SCHEDULEDEXECUTIONID_ISSET_ID = 0;
private static final int __DUMPEXECUTIONID_ISSET_ID = 1;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.METADATA,_Fields.PROGRESS};
+ private static final _Fields optionals[] = {_Fields.METADATA,_Fields.PROGRESS,_Fields.MESSAGE_FORMAT};
public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -116,6 +121,8 @@ package org.apache.hadoop.hive.metastore.api;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.PROGRESS, new org.apache.thrift.meta_data.FieldMetaData("progress", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.MESSAGE_FORMAT, new org.apache.thrift.meta_data.FieldMetaData("messageFormat", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ReplicationMetrics.class, metaDataMap);
}
@@ -152,6 +159,9 @@ package org.apache.hadoop.hive.metastore.api;
if (other.isSetProgress()) {
this.progress = other.progress;
}
+ if (other.isSetMessageFormat()) {
+ this.messageFormat = other.messageFormat;
+ }
}
public ReplicationMetrics deepCopy() {
@@ -167,6 +177,7 @@ package org.apache.hadoop.hive.metastore.api;
this.dumpExecutionId = 0;
this.metadata = null;
this.progress = null;
+ this.messageFormat = null;
}
public long getScheduledExecutionId() {
@@ -285,6 +296,30 @@ package org.apache.hadoop.hive.metastore.api;
}
}
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.String getMessageFormat() {
+ return this.messageFormat;
+ }
+
+ public void setMessageFormat(@org.apache.thrift.annotation.Nullable java.lang.String messageFormat) {
+ this.messageFormat = messageFormat;
+ }
+
+ public void unsetMessageFormat() {
+ this.messageFormat = null;
+ }
+
+ /** Returns true if field messageFormat is set (has been assigned a value) and false otherwise */
+ public boolean isSetMessageFormat() {
+ return this.messageFormat != null;
+ }
+
+ public void setMessageFormatIsSet(boolean value) {
+ if (!value) {
+ this.messageFormat = null;
+ }
+ }
+
public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case SCHEDULED_EXECUTION_ID:
@@ -327,6 +362,14 @@ package org.apache.hadoop.hive.metastore.api;
}
break;
+ case MESSAGE_FORMAT:
+ if (value == null) {
+ unsetMessageFormat();
+ } else {
+ setMessageFormat((java.lang.String)value);
+ }
+ break;
+
}
}
@@ -348,6 +391,9 @@ package org.apache.hadoop.hive.metastore.api;
case PROGRESS:
return getProgress();
+ case MESSAGE_FORMAT:
+ return getMessageFormat();
+
}
throw new java.lang.IllegalStateException();
}
@@ -369,6 +415,8 @@ package org.apache.hadoop.hive.metastore.api;
return isSetMetadata();
case PROGRESS:
return isSetProgress();
+ case MESSAGE_FORMAT:
+ return isSetMessageFormat();
}
throw new java.lang.IllegalStateException();
}
@@ -431,6 +479,15 @@ package org.apache.hadoop.hive.metastore.api;
return false;
}
+ boolean this_present_messageFormat = true && this.isSetMessageFormat();
+ boolean that_present_messageFormat = true && that.isSetMessageFormat();
+ if (this_present_messageFormat || that_present_messageFormat) {
+ if (!(this_present_messageFormat && that_present_messageFormat))
+ return false;
+ if (!this.messageFormat.equals(that.messageFormat))
+ return false;
+ }
+
return true;
}
@@ -454,6 +511,10 @@ package org.apache.hadoop.hive.metastore.api;
if (isSetProgress())
hashCode = hashCode * 8191 + progress.hashCode();
+ hashCode = hashCode * 8191 + ((isSetMessageFormat()) ? 131071 : 524287);
+ if (isSetMessageFormat())
+ hashCode = hashCode * 8191 + messageFormat.hashCode();
+
return hashCode;
}
@@ -515,6 +576,16 @@ package org.apache.hadoop.hive.metastore.api;
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetMessageFormat(), other.isSetMessageFormat());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetMessageFormat()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.messageFormat, other.messageFormat);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -571,6 +642,16 @@ package org.apache.hadoop.hive.metastore.api;
}
first = false;
}
+ if (isSetMessageFormat()) {
+ if (!first) sb.append(", ");
+ sb.append("messageFormat:");
+ if (this.messageFormat == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.messageFormat);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -668,6 +749,14 @@ package org.apache.hadoop.hive.metastore.api;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 6: // MESSAGE_FORMAT
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.messageFormat = iprot.readString();
+ struct.setMessageFormatIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -706,6 +795,13 @@ package org.apache.hadoop.hive.metastore.api;
oprot.writeFieldEnd();
}
}
+ if (struct.messageFormat != null) {
+ if (struct.isSetMessageFormat()) {
+ oprot.writeFieldBegin(MESSAGE_FORMAT_FIELD_DESC);
+ oprot.writeString(struct.messageFormat);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -733,13 +829,19 @@ package org.apache.hadoop.hive.metastore.api;
if (struct.isSetProgress()) {
optionals.set(1);
}
- oprot.writeBitSet(optionals, 2);
+ if (struct.isSetMessageFormat()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
if (struct.isSetMetadata()) {
oprot.writeString(struct.metadata);
}
if (struct.isSetProgress()) {
oprot.writeString(struct.progress);
}
+ if (struct.isSetMessageFormat()) {
+ oprot.writeString(struct.messageFormat);
+ }
}
@Override
@@ -751,7 +853,7 @@ package org.apache.hadoop.hive.metastore.api;
struct.setPolicyIsSet(true);
struct.dumpExecutionId = iprot.readI64();
struct.setDumpExecutionIdIsSet(true);
- java.util.BitSet incoming = iprot.readBitSet(2);
+ java.util.BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
struct.metadata = iprot.readString();
struct.setMetadataIsSet(true);
@@ -760,6 +862,10 @@ package org.apache.hadoop.hive.metastore.api;
struct.progress = iprot.readString();
struct.setProgressIsSet(true);
}
+ if (incoming.get(2)) {
+ struct.messageFormat = iprot.readString();
+ struct.setMessageFormatIsSet(true);
+ }
}
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php
index c1b19de..0093684 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ReplicationMetrics.php
@@ -46,6 +46,11 @@ class ReplicationMetrics
'isRequired' => false,
'type' => TType::STRING,
),
+ 6 => array(
+ 'var' => 'messageFormat',
+ 'isRequired' => false,
+ 'type' => TType::STRING,
+ ),
);
/**
@@ -68,6 +73,10 @@ class ReplicationMetrics
* @var string
*/
public $progress = null;
+ /**
+ * @var string
+ */
+ public $messageFormat = null;
public function __construct($vals = null)
{
@@ -87,6 +96,9 @@ class ReplicationMetrics
if (isset($vals['progress'])) {
$this->progress = $vals['progress'];
}
+ if (isset($vals['messageFormat'])) {
+ $this->messageFormat = $vals['messageFormat'];
+ }
}
}
@@ -144,6 +156,13 @@ class ReplicationMetrics
$xfer += $input->skip($ftype);
}
break;
+ case 6:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->messageFormat);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -183,6 +202,11 @@ class ReplicationMetrics
$xfer += $output->writeString($this->progress);
$xfer += $output->writeFieldEnd();
}
+ if ($this->messageFormat !== null) {
+ $xfer += $output->writeFieldBegin('messageFormat', TType::STRING, 6);
+ $xfer += $output->writeString($this->messageFormat);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 756e31e..f3e7080 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -26760,16 +26760,18 @@ class ReplicationMetrics(object):
- dumpExecutionId
- metadata
- progress
+ - messageFormat
"""
- def __init__(self, scheduledExecutionId=None, policy=None, dumpExecutionId=None, metadata=None, progress=None,):
+ def __init__(self, scheduledExecutionId=None, policy=None, dumpExecutionId=None, metadata=None, progress=None, messageFormat=None,):
self.scheduledExecutionId = scheduledExecutionId
self.policy = policy
self.dumpExecutionId = dumpExecutionId
self.metadata = metadata
self.progress = progress
+ self.messageFormat = messageFormat
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -26805,6 +26807,11 @@ class ReplicationMetrics(object):
self.progress = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.STRING:
+ self.messageFormat = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -26835,6 +26842,10 @@ class ReplicationMetrics(object):
oprot.writeFieldBegin('progress', TType.STRING, 5)
oprot.writeString(self.progress.encode('utf-8') if sys.version_info[0] == 2 else self.progress)
oprot.writeFieldEnd()
+ if self.messageFormat is not None:
+ oprot.writeFieldBegin('messageFormat', TType.STRING, 6)
+ oprot.writeString(self.messageFormat.encode('utf-8') if sys.version_info[0] == 2 else self.messageFormat)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -31106,6 +31117,7 @@ ReplicationMetrics.thrift_spec = (
(3, TType.I64, 'dumpExecutionId', None, None, ), # 3
(4, TType.STRING, 'metadata', 'UTF8', None, ), # 4
(5, TType.STRING, 'progress', 'UTF8', None, ), # 5
+ (6, TType.STRING, 'messageFormat', 'UTF8', None, ), # 6
)
all_structs.append(ReplicationMetricList)
ReplicationMetricList.thrift_spec = (
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 57749a9..4da24a8 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -7352,13 +7352,15 @@ class ReplicationMetrics
DUMPEXECUTIONID = 3
METADATA = 4
PROGRESS = 5
+ MESSAGEFORMAT = 6
FIELDS = {
SCHEDULEDEXECUTIONID => {:type => ::Thrift::Types::I64, :name => 'scheduledExecutionId'},
POLICY => {:type => ::Thrift::Types::STRING, :name => 'policy'},
DUMPEXECUTIONID => {:type => ::Thrift::Types::I64, :name => 'dumpExecutionId'},
METADATA => {:type => ::Thrift::Types::STRING, :name => 'metadata', :optional => true},
- PROGRESS => {:type => ::Thrift::Types::STRING, :name => 'progress', :optional => true}
+ PROGRESS => {:type => ::Thrift::Types::STRING, :name => 'progress', :optional => true},
+ MESSAGEFORMAT => {:type => ::Thrift::Types::STRING, :name => 'messageFormat', :optional => true}
}
def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 21ea1f8..e42493b 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -730,6 +730,10 @@ public class MetastoreConf {
"hive.metastore.event.message.factory",
"org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder",
"Factory class for making encoding and decoding messages in the events generated."),
+ REPL_MESSAGE_FACTORY("metastore.repl.message.factory",
+ "hive.metastore.repl.message.factory",
+ "org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder",
+ "Factory class to serialize and deserialize information in replication metrics table."),
EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS("metastore.notification.parameters.exclude.patterns",
"hive.metastore.notification.parameters.exclude.patterns", "",
"List of comma-separated regexes that are used to reduced the size of HMS Notification messages."
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index e1d7006..882e901 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -2200,7 +2200,8 @@ struct ReplicationMetrics{
2: required string policy,
3: required i64 dumpExecutionId,
4: optional string metadata,
- 5: optional string progress
+ 5: optional string progress,
+ 6: optional string messageFormat
}
struct ReplicationMetricList{
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 590884c..ea70e94 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -287,6 +287,9 @@ public class ObjectStore implements RawStore, Configurable {
*/
private final static AtomicBoolean isSchemaVerified = new AtomicBoolean(false);
private static final Logger LOG = LoggerFactory.getLogger(ObjectStore.class);
+ private int RM_PROGRESS_COL_WIDTH = 10000;
+ private int RM_METADATA_COL_WIDTH = 4000;
+ private int ORACLE_DB_MAX_COL_WIDTH = 4000;
private enum TXN_STATUS {
NO_STATE, OPEN, COMMITED, ROLLBACK
@@ -14364,17 +14367,22 @@ public class ObjectStore implements RawStore, Configurable {
mReplicationMetrics.setStartTime((int) (System.currentTimeMillis()/1000));
}
if (!StringUtils.isEmpty(replicationMetric.getMetadata())) {
- mReplicationMetrics.setMetadata(replicationMetric.getMetadata());
+ if (replicationMetric.getMetadata().length() > RM_METADATA_COL_WIDTH) {
+ mReplicationMetrics.setProgress("RM_Metadata limit exceeded to " + replicationMetric.getMetadata().length());
+ } else {
+ mReplicationMetrics.setMetadata(replicationMetric.getMetadata());
+ }
}
if (!StringUtils.isEmpty(replicationMetric.getProgress())) {
// Check for the limit of RM_PROGRESS Column.
- if ((dbType.isORACLE() && replicationMetric.getProgress().length() > 4000)
- || replicationMetric.getProgress().length() > 24000) {
- mReplicationMetrics.setProgress("RM_PROGRESS LIMIT EXCEEDED");
+ if ((dbType.isORACLE() && replicationMetric.getProgress().length() > ORACLE_DB_MAX_COL_WIDTH)
+ || replicationMetric.getProgress().length() > RM_PROGRESS_COL_WIDTH) {
+ mReplicationMetrics.setProgress("RM_Progress limit exceeded to " + replicationMetric.getProgress().length());
} else {
mReplicationMetrics.setProgress(replicationMetric.getProgress());
}
}
+ mReplicationMetrics.setMessageFormat(replicationMetric.getMessageFormat());
mReplicationMetricsList.add(mReplicationMetrics);
}
pm.makePersistentAll(mReplicationMetricsList);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
index ffbedce..9455e1d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
@@ -237,6 +237,13 @@ public abstract class MessageDeserializer {
*/
public abstract DeletePartitionColumnStatMessage getDeletePartitionColumnStatMessage(String messageBody);
+ /**
+ * Method to de-serialize any string passed. Need to be over-ridden by specific serialization subclasses.
+ */
+ public String deSerializeGenericString(String messageBody) {
+ return messageBody;
+ }
+
// Protection against construction.
protected MessageDeserializer() {}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index 16e74bb..62cdaf0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -50,6 +50,7 @@ public abstract class MessageFactory {
static {
register(GzipJSONMessageEncoder.FORMAT, GzipJSONMessageEncoder.class);
register(JSONMessageEncoder.FORMAT, JSONMessageEncoder.class);
+ register(supportedCompressionFormats.GZIP.toString().toLowerCase(), GzipJSONMessageEncoder.class);
}
private static Method requiredMethod(Class clazz) {
@@ -80,19 +81,27 @@ public abstract class MessageFactory {
throw new IllegalArgumentException(message);
}
- public static MessageEncoder getInstance(String messageFormat)
+ public static MessageEncoder getInstance(String compressionFormat)
throws InvocationTargetException, IllegalAccessException {
- Method methodInstance = registry.get(messageFormat);
+ Method methodInstance = registry.get(compressionFormat.toLowerCase());
if (methodInstance == null) {
- LOG.error("received incorrect MessageFormat " + messageFormat);
- throw new RuntimeException("messageFormat: " + messageFormat + " is not supported ");
+ LOG.error("received incorrect CompressionFormat " + compressionFormat);
+ throw new RuntimeException("compressionFormat: " + compressionFormat + " is not supported.");
}
return (MessageEncoder) methodInstance.invoke(null);
}
+ public static MessageEncoder getDefaultInstanceForReplMetrics(Configuration conf) {
+ return getInstance(conf, MetastoreConf.ConfVars.REPL_MESSAGE_FACTORY.getVarname());
+ }
+
public static MessageEncoder getDefaultInstance(Configuration conf) {
+ return getInstance(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getVarname());
+ }
+
+ public static MessageEncoder getInstance(Configuration conf, String config) {
String clazz =
- MetastoreConf.get(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getVarname());
+ MetastoreConf.get(conf, config);
try {
Class<?> clazzObject = MessageFactory.class.getClassLoader().loadClass(clazz);
return (MessageEncoder) requiredMethod(clazzObject).invoke(null);
@@ -102,4 +111,11 @@ public abstract class MessageFactory {
throw new IllegalStateException(message, e);
}
}
+
+ public enum supportedCompressionFormats {
+ /**
+ * Currently supported compressionFormats for encoding and decoding the messages in backend RDBMS.
+ */
+ GZIP
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
index b249d76..5b452cf 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
@@ -21,4 +21,7 @@ public interface MessageSerializer {
default String serialize(EventMessage message) {
return message.toString();
}
+ default String serialize(String msg) {
+ return msg;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
index bfea32f..6a67308 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
@@ -232,4 +232,8 @@ public class DeSerializer extends JSONMessageDeserializer {
public DeletePartitionColumnStatMessage getDeletePartitionColumnStatMessage(String messageBody) {
return super.getDeletePartitionColumnStatMessage(deCompress(messageBody));
}
+
+ public String deSerializeGenericString(String messageBody) {
+ return deCompress(messageBody);
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
index 7786d96..ef43590 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
@@ -34,16 +34,20 @@ class Serializer implements MessageSerializer {
@Override
public String serialize(EventMessage message) {
- String messageAsString = MessageSerializer.super.serialize(message);
+ return serialize(MessageSerializer.super.serialize(message));
+ }
+
+ @Override
+ public String serialize(String msg) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
GZIPOutputStream gout = new GZIPOutputStream(baos);
- gout.write(messageAsString.getBytes(StandardCharsets.UTF_8));
+ gout.write(msg.getBytes(StandardCharsets.UTF_8));
gout.close();
byte[] compressed = baos.toByteArray();
return new String(Base64.getEncoder().encode(compressed), StandardCharsets.UTF_8);
} catch (IOException e) {
LOG.error("could not use gzip output stream", e);
- LOG.debug("message " + messageAsString);
+ LOG.debug("message " + msg);
throw new RuntimeException("could not use the gzip output Stream", e);
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java
index 5fe3129..e051621 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MReplicationMetrics.java
@@ -30,6 +30,7 @@ public class MReplicationMetrics {
private String metadata;
private String progress;
private int startTime;
+ private String messageFormat;
public MReplicationMetrics() {
}
@@ -41,6 +42,7 @@ public class MReplicationMetrics {
ret.setMetadata(mReplicationMetric.metadata);
ret.setProgress(mReplicationMetric.progress);
ret.setDumpExecutionId(mReplicationMetric.dumpExecutionId);
+ ret.setMessageFormat(mReplicationMetric.messageFormat);
return ret;
}
@@ -91,4 +93,12 @@ public class MReplicationMetrics {
public void setStartTime(int startTime) {
this.startTime = startTime;
}
+
+ public String getMessageFormat() {
+ return messageFormat;
+ }
+
+ public void setMessageFormat(String messageFormat) {
+ this.messageFormat = messageFormat;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/resources/package.jdo b/standalone-metastore/metastore-server/src/main/resources/package.jdo
index c5a7db1..6b6ee51 100644
--- a/standalone-metastore/metastore-server/src/main/resources/package.jdo
+++ b/standalone-metastore/metastore-server/src/main/resources/package.jdo
@@ -1556,11 +1556,14 @@
<column name="RM_METADATA" jdbc-type="varchar" length="4000" allows-null="true"/>
</field>
<field name="progress">
- <column name="RM_PROGRESS" jdbc-type="varchar" length="24000" allows-null="true"/>
+ <column name="RM_PROGRESS" jdbc-type="varchar" length="10000" allows-null="true"/>
</field>
<field name="startTime">
<column name="RM_START_TIME" jdbc-type="integer" allows-null="false"/>
</field>
+ <field name="messageFormat">
+ <column name="MESSAGE_FORMAT" length="16" jdbc-type="VARCHAR" allows-null="true"/>
+ </field>
<index name="PolicyIndex">
<column name="RM_POLICY"/>
</index>
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 525a909..8fa3470 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -803,8 +803,9 @@ CREATE TABLE "APP"."REPLICATION_METRICS" (
"RM_POLICY" varchar(256) NOT NULL,
"RM_DUMP_EXECUTION_ID" bigint NOT NULL,
"RM_METADATA" varchar(4000),
- "RM_PROGRESS" varchar(24000),
+ "RM_PROGRESS" varchar(10000),
"RM_START_TIME" integer not null,
+ "MESSAGE_FORMAT" VARCHAR(16),
PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
);
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index c9a5393..ead33d7 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -97,8 +97,18 @@ CREATE TABLE "APP"."REPLICATION_METRICS" (
PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
);
---Increase the size of RM_PROGRESS to accomodate the replication statistics
-ALTER TABLE "APP"."REPLICATION_METRICS" ALTER "RM_PROGRESS" SET DATA TYPE VARCHAR(24000);
+DROP TABLE "APP"."REPLICATION_METRICS";
+
+CREATE TABLE "APP"."REPLICATION_METRICS" (
+ "RM_SCHEDULED_EXECUTION_ID" bigint NOT NULL,
+ "RM_POLICY" varchar(256) NOT NULL,
+ "RM_DUMP_EXECUTION_ID" bigint NOT NULL,
+ "RM_METADATA" varchar(4000),
+ "RM_PROGRESS" varchar(10000),
+ "RM_START_TIME" integer not null,
+ "MESSAGE_FORMAT" VARCHAR(16),
+ PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
+);
CREATE INDEX "POLICY_IDX" ON "APP"."REPLICATION_METRICS" ("RM_POLICY");
CREATE INDEX "DUMP_IDX" ON "APP"."REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 6f704a0..af5e763 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1367,7 +1367,8 @@ CREATE TABLE "REPLICATION_METRICS" (
"RM_DUMP_EXECUTION_ID" bigint NOT NULL,
"RM_METADATA" varchar(max),
"RM_PROGRESS" varchar(max),
- "RM_START_TIME" integer NOT NULL
+ "RM_START_TIME" integer NOT NULL,
+ "MESSAGE_FORMAT" nvarchar(16),
);
-- Create indexes for the replication metrics table
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 91bca6c..166bf8e 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -132,6 +132,8 @@ CREATE TABLE "REPLICATION_METRICS" (
"RM_START_TIME" integer NOT NULL
);
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" VARCHAR(16);
+
-- Create indexes for the replication metrics table
CREATE INDEX "POLICY_IDX" ON "REPLICATION_METRICS" ("RM_POLICY");
CREATE INDEX "DUMP_IDX" ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 4163199..66995cf 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1269,8 +1269,9 @@ CREATE TABLE IF NOT EXISTS REPLICATION_METRICS (
RM_POLICY varchar(256) NOT NULL,
RM_DUMP_EXECUTION_ID bigint NOT NULL,
RM_METADATA varchar(4000),
- RM_PROGRESS varchar(24000),
+ RM_PROGRESS varchar(10000),
RM_START_TIME integer NOT NULL,
+ MESSAGE_FORMAT varchar(16),
PRIMARY KEY(RM_SCHEDULED_EXECUTION_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 49451c4..8786d4a 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -107,6 +107,10 @@ CREATE TABLE IF NOT EXISTS REPLICATION_METRICS (
--Increase the size of RM_PROGRESS to accomodate the replication statistics
ALTER TABLE REPLICATION_METRICS MODIFY RM_PROGRESS varchar(24000);
+ALTER TABLE REPLICATION_METRICS MODIFY RM_PROGRESS varchar(10000);
+
+ALTER TABLE REPLICATION_METRICS ADD COLUMN `MESSAGE_FORMAT` VARCHAR(16);
+
-- Create indexes for the replication metrics table
CREATE INDEX POLICY_IDX ON REPLICATION_METRICS (RM_POLICY);
CREATE INDEX DUMP_IDX ON REPLICATION_METRICS (RM_DUMP_EXECUTION_ID);
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index e5282a5..303a2f3 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1264,7 +1264,8 @@ CREATE TABLE "REPLICATION_METRICS" (
"RM_DUMP_EXECUTION_ID" number NOT NULL,
"RM_METADATA" varchar2(4000),
"RM_PROGRESS" varchar2(4000),
- "RM_START_TIME" integer NOT NULL
+ "RM_START_TIME" integer NOT NULL,
+ "MESSAGE_FORMAT" VARCHAR(16)
);
--Create indexes for the replication metrics table
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index 610f840..d5715ba 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -101,6 +101,8 @@ CREATE TABLE "REPLICATION_METRICS" (
"RM_START_TIME" integer NOT NULL
);
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" VARCHAR(16);
+
--Create indexes for the replication metrics table
CREATE INDEX POLICY_IDX ON "REPLICATION_METRICS" ("RM_POLICY");
CREATE INDEX DUMP_IDX ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index dc311bb..c80a374 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1976,8 +1976,9 @@ CREATE TABLE "REPLICATION_METRICS" (
"RM_POLICY" varchar(256) NOT NULL,
"RM_DUMP_EXECUTION_ID" bigint NOT NULL,
"RM_METADATA" varchar(4000),
- "RM_PROGRESS" varchar(24000),
+ "RM_PROGRESS" varchar(10000),
"RM_START_TIME" integer NOT NULL,
+ "MESSAGE_FORMAT" VARCHAR(16),
PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
);
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index fbbba67..41b16b1 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -231,6 +231,10 @@ CREATE TABLE "REPLICATION_METRICS" (
--Increase the size of RM_PROGRESS to accomodate the replication statistics
ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(24000);
+ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(10000);
+
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" varchar(16);
+
--Create indexes for the replication metrics table
CREATE INDEX "POLICY_IDX" ON "REPLICATION_METRICS" ("RM_POLICY");
CREATE INDEX "DUMP_IDX" ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
diff --git a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
index c9ec652..21f8061 100644
--- a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
@@ -51,6 +51,10 @@ CREATE TABLE "REPLICATION_METRICS" (
--Increase the size of RM_PROGRESS to accomodate the replication statistics
ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(24000);
+ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(10000);
+
+ALTER TABLE "REPLICATION_METRICS" ADD "MESSAGE_FORMAT" varchar(16);
+
--Create indexes for the replication metrics table
CREATE INDEX "POLICY_IDX" ON "REPLICATION_METRICS" ("RM_POLICY");
CREATE INDEX "DUMP_IDX" ON "REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");