You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/24 23:47:31 UTC

[hudi] branch master updated: [HUDI-1326] Added an API to force publish metrics and flush them. (#2152)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 49e855c  [HUDI-1326] Added an API to force publish metrics and flush them. (#2152)
49e855c is described below

commit 49e855c3488ebe53af3a72fc3b5a22ba47a45e01
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Sat Oct 24 16:47:24 2020 -0700

    [HUDI-1326] Added an API to force publish metrics and flush them. (#2152)
    
    * [HUDI-1326] Added an API to force publish metrics and flush them.
    
    Using the added API, publish metrics after each level of the DAG completed in hudi-test-suite.
    
    * Code cleanups
    
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .../main/java/org/apache/hudi/metrics/Metrics.java | 32 ++++++++++++++++++----
 .../testsuite/dag/scheduler/DagScheduler.java      |  4 +++
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index c4107ce..5667a66 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -38,13 +38,15 @@ public class Metrics {
   private static final Logger LOG = LogManager.getLogger(Metrics.class);
 
   private static volatile boolean initialized = false;
-  private static Metrics metrics = null;
+  private static Metrics instance = null;
+
   private final MetricRegistry registry;
   private MetricsReporter reporter;
+  private final String commonMetricPrefix;
 
   private Metrics(HoodieWriteConfig metricConfig) {
     registry = new MetricRegistry();
-
+    commonMetricPrefix = metricConfig.getTableName();
     reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
     if (reporter == null) {
       throw new RuntimeException("Cannot initialize Reporter.");
@@ -68,13 +70,24 @@ public class Metrics {
     }
   }
 
+  private void reportAndFlushMetrics() {
+    try {
+      LOG.info("Reporting and flushing all metrics");
+      this.registerHoodieCommonMetrics();
+      this.reporter.report();
+      this.registry.getNames().forEach(this.registry::remove);
+    } catch (Exception e) {
+      LOG.error("Error while reporting and flushing metrics", e);
+    }
+  }
+
   private void registerHoodieCommonMetrics() {
-    registerGauges(Registry.getAllMetrics(true, true), Option.empty());
+    registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
   }
 
   public static Metrics getInstance() {
     assert initialized;
-    return metrics;
+    return instance;
   }
 
   public static synchronized void init(HoodieWriteConfig metricConfig) {
@@ -82,7 +95,7 @@ public class Metrics {
       return;
     }
     try {
-      metrics = new Metrics(metricConfig);
+      instance = new Metrics(metricConfig);
     } catch (Exception e) {
       throw new HoodieException(e);
     }
@@ -93,10 +106,17 @@ public class Metrics {
     if (!initialized) {
       return;
     }
-    metrics.reportAndCloseReporter();
+    instance.reportAndCloseReporter();
     initialized = false;
   }
 
+  public static synchronized void flush() {
+    if (!Metrics.initialized) {
+      return;
+    }
+    instance.reportAndFlushMetrics();
+  }
+
   public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
     String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
     metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
index 22b9ff4..bf6f5da 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
+import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
 import org.apache.hudi.integ.testsuite.dag.WriterContext;
@@ -95,6 +96,9 @@ public class DagScheduler {
       for (Future future : futures) {
         future.get(1, TimeUnit.HOURS);
       }
+
+      // After each level, report and flush the metrics
+      Metrics.flush();
     } while (queue.size() > 0);
     log.info("Finished workloads");
   }