You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/24 23:47:31 UTC
[hudi] branch master updated: [HUDI-1326] Added an API to force
publish metrics and flush them. (#2152)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 49e855c [HUDI-1326] Added an API to force publish metrics and flush them. (#2152)
49e855c is described below
commit 49e855c3488ebe53af3a72fc3b5a22ba47a45e01
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Sat Oct 24 16:47:24 2020 -0700
[HUDI-1326] Added an API to force publish metrics and flush them. (#2152)
* [HUDI-1326] Added an API to force publish metrics and flush them.
Using the added API, publish metrics after each level of the DAG completed in hudi-test-suite.
* Code cleanups
Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
.../main/java/org/apache/hudi/metrics/Metrics.java | 32 ++++++++++++++++++----
.../testsuite/dag/scheduler/DagScheduler.java | 4 +++
2 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index c4107ce..5667a66 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -38,13 +38,15 @@ public class Metrics {
private static final Logger LOG = LogManager.getLogger(Metrics.class);
private static volatile boolean initialized = false;
- private static Metrics metrics = null;
+ private static Metrics instance = null;
+
private final MetricRegistry registry;
private MetricsReporter reporter;
+ private final String commonMetricPrefix;
private Metrics(HoodieWriteConfig metricConfig) {
registry = new MetricRegistry();
-
+ commonMetricPrefix = metricConfig.getTableName();
reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
if (reporter == null) {
throw new RuntimeException("Cannot initialize Reporter.");
@@ -68,13 +70,24 @@ public class Metrics {
}
}
+ private void reportAndFlushMetrics() {
+ try {
+ LOG.info("Reporting and flushing all metrics");
+ this.registerHoodieCommonMetrics();
+ this.reporter.report();
+ this.registry.getNames().forEach(this.registry::remove);
+ } catch (Exception e) {
+ LOG.error("Error while reporting and flushing metrics", e);
+ }
+ }
+
private void registerHoodieCommonMetrics() {
- registerGauges(Registry.getAllMetrics(true, true), Option.empty());
+ registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
}
public static Metrics getInstance() {
assert initialized;
- return metrics;
+ return instance;
}
public static synchronized void init(HoodieWriteConfig metricConfig) {
@@ -82,7 +95,7 @@ public class Metrics {
return;
}
try {
- metrics = new Metrics(metricConfig);
+ instance = new Metrics(metricConfig);
} catch (Exception e) {
throw new HoodieException(e);
}
@@ -93,10 +106,17 @@ public class Metrics {
if (!initialized) {
return;
}
- metrics.reportAndCloseReporter();
+ instance.reportAndCloseReporter();
initialized = false;
}
+ public static synchronized void flush() {
+ if (!Metrics.initialized) {
+ return;
+ }
+ instance.reportAndFlushMetrics();
+ }
+
public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
index 22b9ff4..bf6f5da 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
+import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
@@ -95,6 +96,9 @@ public class DagScheduler {
for (Future future : futures) {
future.get(1, TimeUnit.HOURS);
}
+
+ // After each level, report and flush the metrics
+ Metrics.flush();
} while (queue.size() > 0);
log.info("Finished workloads");
}