You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/07/01 05:25:43 UTC

[hive] branch master updated: HIVE-23784: Fix Replication Metrics Sink to DB (Aasha Medhi, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b98df8  HIVE-23784: Fix Replication Metrics Sink to DB (Aasha Medhi, reviewed by Pravin Kumar Sinha)
8b98df8 is described below

commit 8b98df836e8d2cb3ab86fc91fa9310467cb31d45
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Wed Jul 1 10:55:30 2020 +0530

    HIVE-23784: Fix Replication Metrics Sink to DB (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  2 +-
 .../hive/ql/parse/repl/metric/MetricSink.java      | 39 ++++++-----
 .../repl/metric/ReplicationMetricCollector.java    |  9 +++
 .../repl/metric/TestReplicationMetricSink.java     | 79 ++++++++++++++++++++++
 .../metastore/ReplicationMetricsMaintTask.java     |  3 +-
 5 files changed, 112 insertions(+), 20 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3669f3a..b15b326 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -865,6 +865,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         }
       }
       replLogger.endLog(bootDumpBeginReplId.toString());
+      work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId);
     }
     Long bootDumpEndReplId = currentNotificationId(hiveDb);
     LOG.info("Preparing to return {},{}->{}",
@@ -875,7 +876,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
     work.setDirCopyIterator(extTableCopyWorks.iterator());
     work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
-    work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId);
     return bootDumpBeginReplId;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
index a856c76..07f2916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java
@@ -64,8 +64,10 @@ public final class MetricSink {
   public synchronized void init(HiveConf conf) {
     if (!isInitialised) {
       this.conf = conf;
-      this.executorService.schedule(new MetricSinkWriter(conf), getFrequencyInSecs(), TimeUnit.SECONDS);
+      this.executorService.scheduleAtFixedRate(new MetricSinkWriter(conf), 0,
+        getFrequencyInSecs(), TimeUnit.SECONDS);
       isInitialised = true;
+      LOG.debug("Metrics Sink Initialised with frequency {} ", getFrequencyInSecs());
     }
   }
 
@@ -102,10 +104,12 @@ public final class MetricSink {
     public void run() {
       ReplicationMetricList metricList = new ReplicationMetricList();
       try {
+        LOG.debug("Updating metrics to DB");
         // get metrics
         LinkedList<ReplicationMetric> metrics = collector.getMetrics();
         //Move metrics to thrift list
         if (metrics.size() > 0) {
+          LOG.debug("Converting metrics to thrift metrics {} ", metrics.size());
           int totalMetricsSize = metrics.size();
           List<ReplicationMetrics> replicationMetricsList = new ArrayList<>(totalMetricsSize);
           for (int index = 0; index < totalMetricsSize; index++) {
@@ -120,25 +124,24 @@ public final class MetricSink {
             replicationMetricsList.add(persistentMetric);
           }
           metricList.setReplicationMetricList(replicationMetricsList);
+          // write metrics and retry if fails
+          Retry<Void> retriable = new Retry<Void>(Exception.class) {
+            @Override
+            public Void execute() throws Exception {
+              //write
+              if (metricList.getReplicationMetricListSize() > 0) {
+                LOG.debug("Persisting metrics to DB {} ", metricList.getReplicationMetricListSize());
+                Hive.get(conf).getMSC().addReplicationMetrics(metricList);
+              }
+              return null;
+            }
+          };
+          retriable.run();
+        } else {
+          LOG.debug("No Metrics to Update ");
         }
       } catch (Exception e) {
-        throw new RuntimeException("Metrics are not getting persisted", e);
-      }
-      // write metrics and retry if fails
-      Retry<Void> retriable = new Retry<Void>(Exception.class) {
-        @Override
-        public Void execute() throws Exception {
-            //write
-          if (metricList.getReplicationMetricListSize() > 0) {
-            Hive.get(conf).getMSC().addReplicationMetrics(metricList);
-          }
-          return null;
-        }
-      };
-      try {
-        retriable.run();
-      } catch (Exception e) {
-        throw new RuntimeException("Metrics are not getting persisted to HMS", e);
+        LOG.error("Metrics are not getting persisted", e);
       }
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index f97332c..61cc348 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -34,6 +36,7 @@ import java.util.Map;
  * Abstract class for Replication Metric Collection.
  */
 public abstract class ReplicationMetricCollector {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationMetricCollector.class);
   private ReplicationMetric replicationMetric;
   private MetricCollector metricCollector;
   private boolean isEnabled;
@@ -53,7 +56,9 @@ public abstract class ReplicationMetricCollector {
 
   public void reportStageStart(String stageName, Map<String, Long> metricMap) throws SemanticException {
     if (isEnabled) {
+      LOG.debug("Stage Started {}, {}, {}", stageName, metricMap.size(), metricMap );
       Progress progress = replicationMetric.getProgress();
+      progress.setStatus(Status.IN_PROGRESS);
       Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis());
       for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
         stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
@@ -67,6 +72,7 @@ public abstract class ReplicationMetricCollector {
 
   public void reportStageEnd(String stageName, Status status, long lastReplId) throws SemanticException {
     if (isEnabled) {
+      LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId );
       Progress progress = replicationMetric.getProgress();
       Stage stage = progress.getStageByName(stageName);
       stage.setStatus(status);
@@ -81,6 +87,7 @@ public abstract class ReplicationMetricCollector {
 
   public void reportStageEnd(String stageName, Status status) throws SemanticException {
     if (isEnabled) {
+      LOG.debug("Stage Ended {}, {}", stageName, status );
       Progress progress = replicationMetric.getProgress();
       Stage stage = progress.getStageByName(stageName);
       stage.setStatus(status);
@@ -92,6 +99,7 @@ public abstract class ReplicationMetricCollector {
 
   public void reportStageProgress(String stageName, String metricName, long count) throws SemanticException {
     if (isEnabled) {
+      LOG.debug("Stage progress {}, {}, {}", stageName, metricName, count );
       Progress progress = replicationMetric.getProgress();
       Stage stage = progress.getStageByName(stageName);
       Metric metric = stage.getMetricByName(metricName);
@@ -107,6 +115,7 @@ public abstract class ReplicationMetricCollector {
 
   public void reportEnd(Status status) throws SemanticException {
     if (isEnabled) {
+      LOG.info("End {}", status );
       Progress progress = replicationMetric.getProgress();
       progress.setStatus(status);
       replicationMetric.setProgress(progress);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
index d3ad8fb..dfed9c2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.ReplicationMetrics;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
@@ -127,6 +128,63 @@ public class TestReplicationMetricSink {
     actualMetric.setProgress(progress);
     checkSuccess(actualMetric, expectedMetric, "dump",
         Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
+
+    //Incremental
+    conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "2");
+    ReplicationMetricCollector incrementDumpMetricCollector = new IncrementalDumpMetricCollector(
+      "testAcidTablesReplLoadBootstrapIncr_1592205875387",
+      "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
+        + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+    metricMap = new HashMap<>();
+    metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
+    incrementDumpMetricCollector.reportStageStart("dump", metricMap);
+    incrementDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.EVENTS.name(), 10);
+    incrementDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10);
+    incrementDumpMetricCollector.reportEnd(Status.SUCCESS);
+
+    expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
+      Metadata.ReplicationType.INCREMENTAL, "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_"
+      + "parse_TestReplicationScenarios_245261428230295/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGlu"
+      + "Y3JfMTU5MjIwNTg3NTM4Nw==/0/hive");
+    expectedMetadata.setLastReplId(10);
+    expectedProgress = new Progress();
+    expectedProgress.setStatus(Status.SUCCESS);
+    dumpStage = new Stage("dump", Status.SUCCESS, 0);
+    dumpStage.setEndTime(0);
+    Metric expectedEventsMetric = new Metric(ReplUtils.MetricName.EVENTS.name(), 10);
+    expectedEventsMetric.setCurrentCount(10);
+    dumpStage.addMetric(expectedEventsMetric);
+    expectedProgress.addStage(dumpStage);
+    expectedMetric = new ReplicationMetric(2, "repl", 0,
+      expectedMetadata);
+    expectedMetric.setProgress(expectedProgress);
+    Thread.sleep(1000 * 20);
+    metricsRequest = new GetReplicationMetricsRequest();
+    metricsRequest.setPolicy("repl");
+    actualReplicationMetrics = Hive.get(conf).getMSC().getReplicationMetrics(metricsRequest);
+    Assert.assertEquals(2, actualReplicationMetrics.getReplicationMetricListSize());
+    actualThriftMetric = actualReplicationMetrics.getReplicationMetricList().get(0);
+    mapper = new ObjectMapper();
+    actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(),
+      actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(),
+      mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class));
+    progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class);
+    progress = new Progress();
+    progress.setStatus(progressMapper.getStatus());
+    for (StageMapper stageMapper : progressMapper.getStages()) {
+      Stage stage = new Stage();
+      stage.setName(stageMapper.getName());
+      stage.setStatus(stageMapper.getStatus());
+      stage.setStartTime(stageMapper.getStartTime());
+      stage.setEndTime(stageMapper.getEndTime());
+      for (Metric metric : stageMapper.getMetrics()) {
+        stage.addMetric(metric);
+      }
+      progress.addStage(stage);
+    }
+    actualMetric.setProgress(progress);
+    checkSuccessIncremental(actualMetric, expectedMetric, "dump",
+      Arrays.asList(ReplUtils.MetricName.EVENTS.name()));
   }
 
   private void checkSuccess(ReplicationMetric actual, ReplicationMetric expected, String stageName,
@@ -150,4 +208,25 @@ public class TestReplicationMetricSink {
     }
   }
 
+  private void checkSuccessIncremental(ReplicationMetric actual, ReplicationMetric expected, String stageName,
+                            List<String> metricNames) {
+    Assert.assertEquals(expected.getDumpExecutionId(), actual.getDumpExecutionId());
+    Assert.assertEquals(expected.getPolicy(), actual.getPolicy());
+    Assert.assertEquals(expected.getScheduledExecutionId(), actual.getScheduledExecutionId());
+    Assert.assertEquals(expected.getMetadata().getReplicationType(), actual.getMetadata().getReplicationType());
+    Assert.assertEquals(expected.getMetadata().getDbName(), actual.getMetadata().getDbName());
+    Assert.assertEquals(expected.getMetadata().getStagingDir(), actual.getMetadata().getStagingDir());
+    Assert.assertEquals(expected.getMetadata().getLastReplId(), actual.getMetadata().getLastReplId());
+    Assert.assertEquals(expected.getProgress().getStatus(), actual.getProgress().getStatus());
+    Assert.assertEquals(expected.getProgress().getStageByName(stageName).getStatus(),
+      actual.getProgress().getStageByName(stageName).getStatus());
+    for (String metricName : metricNames) {
+      Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount(),
+        actual.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount());
+      Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName)
+        .getCurrentCount(), actual.getProgress()
+        .getStageByName(stageName).getMetricByName(metricName).getCurrentCount());
+    }
+  }
+
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java
index 4ba968f..1df99e0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java
@@ -63,10 +63,11 @@ public class ReplicationMetricsMaintTask implements MetastoreTaskThread {
       if (!MetastoreConf.getBoolVar(conf, ConfVars.SCHEDULED_QUERIES_ENABLED)) {
         return;
       }
+      LOG.debug("Cleaning up older Metrics");
       RawStore ms = HiveMetaStore.HMSHandler.getMSForConf(conf);
       int maxRetainSecs = (int) TimeUnit.DAYS.toSeconds(MetastoreConf.getTimeVar(conf,
         ConfVars.REPL_METRICS_MAX_AGE, TimeUnit.DAYS));
-      int deleteCnt = ms.deleteScheduledExecutions(maxRetainSecs);
+      int deleteCnt = ms.deleteReplicationMetrics(maxRetainSecs);
       if (deleteCnt > 0L){
         LOG.info("Number of deleted entries: " + deleteCnt);
       }