You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/07/01 05:25:43 UTC
[hive] branch master updated: HIVE-23784: Fix Replication Metrics
Sink to DB (Aasha Medhi, reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8b98df8 HIVE-23784: Fix Replication Metrics Sink to DB (Aasha Medhi, reviewed by Pravin Kumar Sinha)
8b98df8 is described below
commit 8b98df836e8d2cb3ab86fc91fa9310467cb31d45
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Wed Jul 1 10:55:30 2020 +0530
HIVE-23784: Fix Replication Metrics Sink to DB (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 2 +-
.../hive/ql/parse/repl/metric/MetricSink.java | 39 ++++++-----
.../repl/metric/ReplicationMetricCollector.java | 9 +++
.../repl/metric/TestReplicationMetricSink.java | 79 ++++++++++++++++++++++
.../metastore/ReplicationMetricsMaintTask.java | 3 +-
5 files changed, 112 insertions(+), 20 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3669f3a..b15b326 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -865,6 +865,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
replLogger.endLog(bootDumpBeginReplId.toString());
+ work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId);
}
Long bootDumpEndReplId = currentNotificationId(hiveDb);
LOG.info("Preparing to return {},{}->{}",
@@ -875,7 +876,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
work.setDirCopyIterator(extTableCopyWorks.iterator());
work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
- work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId);
return bootDumpBeginReplId;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
index a856c76..07f2916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
@@ -64,8 +64,10 @@ public final class MetricSink {
public synchronized void init(HiveConf conf) {
if (!isInitialised) {
this.conf = conf;
- this.executorService.schedule(new MetricSinkWriter(conf), getFrequencyInSecs(), TimeUnit.SECONDS);
+ this.executorService.scheduleAtFixedRate(new MetricSinkWriter(conf), 0,
+ getFrequencyInSecs(), TimeUnit.SECONDS);
isInitialised = true;
+ LOG.debug("Metrics Sink Initialised with frequency {} ", getFrequencyInSecs());
}
}
@@ -102,10 +104,12 @@ public final class MetricSink {
public void run() {
ReplicationMetricList metricList = new ReplicationMetricList();
try {
+ LOG.debug("Updating metrics to DB");
// get metrics
LinkedList<ReplicationMetric> metrics = collector.getMetrics();
//Move metrics to thrift list
if (metrics.size() > 0) {
+ LOG.debug("Converting metrics to thrift metrics {} ", metrics.size());
int totalMetricsSize = metrics.size();
List<ReplicationMetrics> replicationMetricsList = new ArrayList<>(totalMetricsSize);
for (int index = 0; index < totalMetricsSize; index++) {
@@ -120,25 +124,24 @@ public final class MetricSink {
replicationMetricsList.add(persistentMetric);
}
metricList.setReplicationMetricList(replicationMetricsList);
+ // write metrics and retry if fails
+ Retry<Void> retriable = new Retry<Void>(Exception.class) {
+ @Override
+ public Void execute() throws Exception {
+ //write
+ if (metricList.getReplicationMetricListSize() > 0) {
+ LOG.debug("Persisting metrics to DB {} ", metricList.getReplicationMetricListSize());
+ Hive.get(conf).getMSC().addReplicationMetrics(metricList);
+ }
+ return null;
+ }
+ };
+ retriable.run();
+ } else {
+ LOG.debug("No Metrics to Update ");
}
} catch (Exception e) {
- throw new RuntimeException("Metrics are not getting persisted", e);
- }
- // write metrics and retry if fails
- Retry<Void> retriable = new Retry<Void>(Exception.class) {
- @Override
- public Void execute() throws Exception {
- //write
- if (metricList.getReplicationMetricListSize() > 0) {
- Hive.get(conf).getMSC().addReplicationMetrics(metricList);
- }
- return null;
- }
- };
- try {
- retriable.run();
- } catch (Exception e) {
- throw new RuntimeException("Metrics are not getting persisted to HMS", e);
+ LOG.error("Metrics are not getting persisted", e);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index f97332c..61cc348 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
@@ -34,6 +36,7 @@ import java.util.Map;
* Abstract class for Replication Metric Collection.
*/
public abstract class ReplicationMetricCollector {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationMetricCollector.class);
private ReplicationMetric replicationMetric;
private MetricCollector metricCollector;
private boolean isEnabled;
@@ -53,7 +56,9 @@ public abstract class ReplicationMetricCollector {
public void reportStageStart(String stageName, Map<String, Long> metricMap) throws SemanticException {
if (isEnabled) {
+ LOG.debug("Stage Started {}, {}, {}", stageName, metricMap.size(), metricMap );
Progress progress = replicationMetric.getProgress();
+ progress.setStatus(Status.IN_PROGRESS);
Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis());
for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
@@ -67,6 +72,7 @@ public abstract class ReplicationMetricCollector {
public void reportStageEnd(String stageName, Status status, long lastReplId) throws SemanticException {
if (isEnabled) {
+ LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId );
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
stage.setStatus(status);
@@ -81,6 +87,7 @@ public abstract class ReplicationMetricCollector {
public void reportStageEnd(String stageName, Status status) throws SemanticException {
if (isEnabled) {
+ LOG.debug("Stage Ended {}, {}", stageName, status );
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
stage.setStatus(status);
@@ -92,6 +99,7 @@ public abstract class ReplicationMetricCollector {
public void reportStageProgress(String stageName, String metricName, long count) throws SemanticException {
if (isEnabled) {
+ LOG.debug("Stage progress {}, {}, {}", stageName, metricName, count );
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
Metric metric = stage.getMetricByName(metricName);
@@ -107,6 +115,7 @@ public abstract class ReplicationMetricCollector {
public void reportEnd(Status status) throws SemanticException {
if (isEnabled) {
+ LOG.info("End {}", status );
Progress progress = replicationMetric.getProgress();
progress.setStatus(status);
replicationMetric.setProgress(progress);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
index d3ad8fb..dfed9c2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.ReplicationMetrics;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
@@ -127,6 +128,63 @@ public class TestReplicationMetricSink {
actualMetric.setProgress(progress);
checkSuccess(actualMetric, expectedMetric, "dump",
Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
+
+ //Incremental
+ conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "2");
+ ReplicationMetricCollector incrementDumpMetricCollector = new IncrementalDumpMetricCollector(
+ "testAcidTablesReplLoadBootstrapIncr_1592205875387",
+ "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
+ + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+ metricMap = new HashMap<>();
+ metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
+ incrementDumpMetricCollector.reportStageStart("dump", metricMap);
+ incrementDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.EVENTS.name(), 10);
+ incrementDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10);
+ incrementDumpMetricCollector.reportEnd(Status.SUCCESS);
+
+ expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
+ Metadata.ReplicationType.INCREMENTAL, "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_"
+ + "parse_TestReplicationScenarios_245261428230295/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGlu"
+ + "Y3JfMTU5MjIwNTg3NTM4Nw==/0/hive");
+ expectedMetadata.setLastReplId(10);
+ expectedProgress = new Progress();
+ expectedProgress.setStatus(Status.SUCCESS);
+ dumpStage = new Stage("dump", Status.SUCCESS, 0);
+ dumpStage.setEndTime(0);
+ Metric expectedEventsMetric = new Metric(ReplUtils.MetricName.EVENTS.name(), 10);
+ expectedEventsMetric.setCurrentCount(10);
+ dumpStage.addMetric(expectedEventsMetric);
+ expectedProgress.addStage(dumpStage);
+ expectedMetric = new ReplicationMetric(2, "repl", 0,
+ expectedMetadata);
+ expectedMetric.setProgress(expectedProgress);
+ Thread.sleep(1000 * 20);
+ metricsRequest = new GetReplicationMetricsRequest();
+ metricsRequest.setPolicy("repl");
+ actualReplicationMetrics = Hive.get(conf).getMSC().getReplicationMetrics(metricsRequest);
+ Assert.assertEquals(2, actualReplicationMetrics.getReplicationMetricListSize());
+ actualThriftMetric = actualReplicationMetrics.getReplicationMetricList().get(0);
+ mapper = new ObjectMapper();
+ actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
+ actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
+ mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
+ progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+ progress = new Progress();
+ progress.setStatus(progressMapper.getStatus());
+ for (StageMapper stageMapper : progressMapper.getStages()) {
+ Stage stage = new Stage();
+ stage.setName(stageMapper.getName());
+ stage.setStatus(stageMapper.getStatus());
+ stage.setStartTime(stageMapper.getStartTime());
+ stage.setEndTime(stageMapper.getEndTime());
+ for (Metric metric : stageMapper.getMetrics()) {
+ stage.addMetric(metric);
+ }
+ progress.addStage(stage);
+ }
+ actualMetric.setProgress(progress);
+ checkSuccessIncremental(actualMetric, expectedMetric, "dump",
+ Arrays.asList(ReplUtils.MetricName.EVENTS.name()));
}
private void checkSuccess(ReplicationMetric actual, ReplicationMetric expected, String stageName,
@@ -150,4 +208,25 @@ public class TestReplicationMetricSink {
}
}
+ private void checkSuccessIncremental(ReplicationMetric actual, ReplicationMetric expected, String stageName,
+ List<String> metricNames) {
+ Assert.assertEquals(expected.getDumpExecutionId(), actual.getDumpExecutionId());
+ Assert.assertEquals(expected.getPolicy(), actual.getPolicy());
+ Assert.assertEquals(expected.getScheduledExecutionId(), actual.getScheduledExecutionId());
+ Assert.assertEquals(expected.getMetadata().getReplicationType(), actual.getMetadata().getReplicationType());
+ Assert.assertEquals(expected.getMetadata().getDbName(), actual.getMetadata().getDbName());
+ Assert.assertEquals(expected.getMetadata().getStagingDir(), actual.getMetadata().getStagingDir());
+ Assert.assertEquals(expected.getMetadata().getLastReplId(), actual.getMetadata().getLastReplId());
+ Assert.assertEquals(expected.getProgress().getStatus(), actual.getProgress().getStatus());
+ Assert.assertEquals(expected.getProgress().getStageByName(stageName).getStatus(),
+ actual.getProgress().getStageByName(stageName).getStatus());
+ for (String metricName : metricNames) {
+ Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount(),
+ actual.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount());
+ Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName)
+ .getCurrentCount(), actual.getProgress()
+ .getStageByName(stageName).getMetricByName(metricName).getCurrentCount());
+ }
+ }
+
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java
index 4ba968f..1df99e0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java
@@ -63,10 +63,11 @@ public class ReplicationMetricsMaintTask implements MetastoreTaskThread {
if (!MetastoreConf.getBoolVar(conf, ConfVars.SCHEDULED_QUERIES_ENABLED)) {
return;
}
+ LOG.debug("Cleaning up older Metrics");
RawStore ms = HiveMetaStore.HMSHandler.getMSForConf(conf);
int maxRetainSecs = (int) TimeUnit.DAYS.toSeconds(MetastoreConf.getTimeVar(conf,
ConfVars.REPL_METRICS_MAX_AGE, TimeUnit.DAYS));
- int deleteCnt = ms.deleteScheduledExecutions(maxRetainSecs);
+ int deleteCnt = ms.deleteReplicationMetrics(maxRetainSecs);
if (deleteCnt > 0L){
LOG.info("Number of deleted entries: " + deleteCnt);
}