You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/28 18:01:23 UTC
[hudi] branch master updated: [HUDI-5777] Support Metrics for Multiple Tables Simultaneously (#7934)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bb240f7beaf [HUDI-5777] Support Metrics for Multiple Tables Simultaneously (#7934)
bb240f7beaf is described below
commit bb240f7beaf48ce3062e29378928bd05c74e5600
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Tue Feb 28 13:01:11 2023 -0500
[HUDI-5777] Support Metrics for Multiple Tables Simultaneously (#7934)
- Allows usage of metrics for multiple tables. This is useful for RFC-20 as well as HoodieMultiTableDeltaStreamer.
---------
Co-authored-by: Jonathan Vexler <=>
---
.../lock/metrics/HoodieLockMetrics.java | 4 +-
.../org/apache/hudi/metrics/HoodieMetrics.java | 90 +++++++-------
.../main/java/org/apache/hudi/metrics/Metrics.java | 92 +++++++--------
.../hudi/metrics/TestHoodieConsoleMetrics.java | 15 ++-
.../hudi/metrics/TestHoodieGraphiteMetrics.java | 15 ++-
.../apache/hudi/metrics/TestHoodieJmxMetrics.java | 40 ++++---
.../org/apache/hudi/metrics/TestHoodieMetrics.java | 129 +++++++++++----------
.../datadog/TestDatadogMetricsReporter.java | 33 +++++-
.../metrics/prometheus/TestPrometheusReporter.java | 11 +-
.../prometheus/TestPushGateWayReporter.java | 16 ++-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 4 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 9 ++
.../deltastreamer/HoodieDeltaStreamer.java | 2 -
.../deltastreamer/HoodieDeltaStreamerMetrics.java | 57 +++++++--
.../deltastreamer/TestHoodieDeltaStreamer.java | 4 +-
16 files changed, 314 insertions(+), 209 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
index c33a86bfbe7..0fb5fd1caa3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
@@ -47,13 +47,15 @@ public class HoodieLockMetrics {
private transient Timer lockDuration;
private transient Timer lockApiRequestDuration;
private static final Object REGISTRY_LOCK = new Object();
+ private Metrics metrics;
public HoodieLockMetrics(HoodieWriteConfig writeConfig) {
this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled();
this.writeConfig = writeConfig;
if (isMetricsEnabled) {
- MetricRegistry registry = Metrics.getInstance().getRegistry();
+ metrics = Metrics.getInstance(writeConfig);
+ MetricRegistry registry = metrics.getRegistry();
lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME));
successfulLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_SUCCESS_COUNTER_NAME));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 7b34bb48176..7737b78b193 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -35,6 +35,8 @@ import org.apache.log4j.Logger;
public class HoodieMetrics {
private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class);
+
+ private Metrics metrics;
// Some timers
public String rollbackTimerName = null;
public String cleanTimerName = null;
@@ -67,7 +69,7 @@ public class HoodieMetrics {
this.config = config;
this.tableName = config.getTableName();
if (config.isMetricsOn()) {
- Metrics.init(config);
+ metrics = Metrics.getInstance(config);
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
@@ -84,7 +86,11 @@ public class HoodieMetrics {
}
private Timer createTimer(String name) {
- return config.isMetricsOn() ? Metrics.getInstance().getRegistry().timer(name) : null;
+ return config.isMetricsOn() ? metrics.getRegistry().timer(name) : null;
+ }
+
+ public Metrics getMetrics() {
+ return metrics;
}
public Timer.Context getRollbackCtx() {
@@ -162,20 +168,20 @@ public class HoodieMetrics {
// No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
return;
}
- Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
- Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
+ metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
}
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
@@ -196,20 +202,20 @@ public class HoodieMetrics {
long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
long totalLogFilesSize = metadata.getTotalLogFilesSize();
- Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
- Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
- Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
- Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
- Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
- Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
- Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
- Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
- Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
- Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
- Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
- Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
- Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
- Metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), totalRecordsDeleted);
+ metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
+ metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
+ metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
+ metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
+ metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
+ metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
+ metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
+ metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
+ metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
+ metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
+ metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
+ metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
+ metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
+ metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), totalRecordsDeleted);
}
}
@@ -219,14 +225,14 @@ public class HoodieMetrics {
Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
if (eventTimePairMinMax.getLeft().isPresent()) {
long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
- Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
+ metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
}
if (eventTimePairMinMax.getRight().isPresent()) {
long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
- Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
+ metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
}
- Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
- Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
+ metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
+ metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
}
}
@@ -234,8 +240,8 @@ public class HoodieMetrics {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
- Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
- Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
+ metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
+ metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
}
}
@@ -243,8 +249,8 @@ public class HoodieMetrics {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
- Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
- Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
+ metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
+ metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
}
}
@@ -252,15 +258,15 @@ public class HoodieMetrics {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
numFilesFinalized));
- Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
- Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
+ metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
+ metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
}
}
public void updateIndexMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
- Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
+ metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
}
}
@@ -293,7 +299,7 @@ public class HoodieMetrics {
private Counter getCounter(Counter counter, String name) {
if (counter == null) {
- return Metrics.getInstance().getRegistry().counter(name);
+ return metrics.getRegistry().counter(name);
}
return counter;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index 8f3e4974812..f999f185bc8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -21,12 +21,12 @@ package org.apache.hudi.metrics;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
import com.codahale.metrics.MetricRegistry;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -36,14 +36,14 @@ public class Metrics {
private static final Logger LOG = LogManager.getLogger(Metrics.class);
- private static volatile boolean initialized = false;
- private static Metrics instance = null;
+ private static final Map<String, Metrics> METRICS_INSTANCE_PER_BASEPATH = new HashMap<>();
private final MetricRegistry registry;
private MetricsReporter reporter;
private final String commonMetricPrefix;
+ private boolean initialized = false;
- private Metrics(HoodieWriteConfig metricConfig) {
+ public Metrics(HoodieWriteConfig metricConfig) {
registry = new MetricRegistry();
commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix();
reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
@@ -52,10 +52,30 @@ public class Metrics {
}
reporter.start();
- Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
+ Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
+ this.initialized = true;
}
- private void reportAndStopReporter() {
+ private void registerHoodieCommonMetrics() {
+ registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
+ }
+
+ public static synchronized Metrics getInstance(HoodieWriteConfig metricConfig) {
+ String basePath = metricConfig.getBasePath();
+ if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
+ return METRICS_INSTANCE_PER_BASEPATH.get(basePath);
+ }
+
+ Metrics metrics = new Metrics(metricConfig);
+ METRICS_INSTANCE_PER_BASEPATH.put(basePath, metrics);
+ return metrics;
+ }
+
+ public static synchronized void shutdownAllMetrics() {
+ METRICS_INSTANCE_PER_BASEPATH.values().forEach(Metrics::shutdown);
+ }
+
+ public synchronized void shutdown() {
try {
registerHoodieCommonMetrics();
reporter.report();
@@ -63,64 +83,29 @@ public class Metrics {
reporter.stop();
} catch (Exception e) {
LOG.warn("Error while closing reporter", e);
+ } finally {
+ initialized = false;
}
}
- private void reportAndFlushMetrics() {
+ public synchronized void flush() {
try {
LOG.info("Reporting and flushing all metrics");
- this.registerHoodieCommonMetrics();
- this.reporter.report();
- this.registry.getNames().forEach(this.registry::remove);
+ registerHoodieCommonMetrics();
+ reporter.report();
+ registry.getNames().forEach(this.registry::remove);
} catch (Exception e) {
LOG.error("Error while reporting and flushing metrics", e);
}
}
-
- private void registerHoodieCommonMetrics() {
- registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
- }
-
- public static Metrics getInstance() {
- assert initialized;
- return instance;
- }
-
- public static synchronized void init(HoodieWriteConfig metricConfig) {
- if (initialized) {
- return;
- }
- try {
- instance = new Metrics(metricConfig);
- } catch (Exception e) {
- throw new HoodieException(e);
- }
- initialized = true;
- }
-
- public static synchronized void shutdown() {
- if (!initialized) {
- return;
- }
- instance.reportAndStopReporter();
- initialized = false;
- }
-
- public static synchronized void flush() {
- if (!Metrics.initialized) {
- return;
- }
- instance.reportAndFlushMetrics();
- }
-
- public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
+
+ public void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
}
- public static void registerGauge(String metricName, final long value) {
+ public void registerGauge(String metricName, final long value) {
try {
- MetricRegistry registry = Metrics.getInstance().getRegistry();
HoodieGauge guage = (HoodieGauge) registry.gauge(metricName, () -> new HoodieGauge<>(value));
guage.setValue(value);
} catch (Exception e) {
@@ -134,7 +119,10 @@ public class Metrics {
return registry;
}
- public static boolean isInitialized() {
- return initialized;
+ public static boolean isInitialized(String basePath) {
+ if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
+ return METRICS_INSTANCE_PER_BASEPATH.get(basePath).initialized;
+ }
+ return false;
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
index 467a9f79293..4a0de10512e 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
@@ -26,7 +26,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@@ -35,23 +36,27 @@ public class TestHoodieConsoleMetrics {
@Mock
HoodieWriteConfig config;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
@BeforeEach
public void start() {
when(config.getTableName()).thenReturn("console_metrics_test");
when(config.isMetricsOn()).thenReturn(true);
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE);
- new HoodieMetrics(config);
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
}
@AfterEach
public void stop() {
- Metrics.shutdown();
+ metrics.shutdown();
}
@Test
public void testRegisterGauge() {
- registerGauge("metric1", 123L);
- assertEquals("123", Metrics.getInstance().getRegistry().getGauges().get("metric1").getValue().toString());
+ metrics.registerGauge("metric1", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
index 6ff7ee88ac8..dc1d0ae0cf5 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
@@ -26,7 +26,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@@ -38,10 +39,12 @@ public class TestHoodieGraphiteMetrics {
@Mock
HoodieWriteConfig config;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
@AfterEach
void shutdownMetrics() {
- Metrics.shutdown();
+ metrics.shutdown();
}
@Test
@@ -52,9 +55,11 @@ public class TestHoodieGraphiteMetrics {
when(config.getGraphiteServerHost()).thenReturn("localhost");
when(config.getGraphiteServerPort()).thenReturn(NetworkTestUtils.nextFreePort());
when(config.getGraphiteReportPeriodSeconds()).thenReturn(30);
- new HoodieMetrics(config);
- registerGauge("graphite_metric", 123L);
- assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
+ metrics.registerGauge("graphite_metric", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges()
.get("graphite_metric").getValue().toString());
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
index a752aa36eca..a2ec03263a7 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
@@ -22,12 +22,14 @@ import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@@ -39,35 +41,37 @@ public class TestHoodieJmxMetrics {
@Mock
HoodieWriteConfig config;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
+
+ @BeforeEach
+ void setup() {
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getTableName()).thenReturn("foo");
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+ when(config.getJmxHost()).thenReturn("localhost");
+ when(config.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort()));
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
+ }
@AfterEach
void shutdownMetrics() {
- Metrics.shutdown();
+ metrics.shutdown();
}
@Test
public void testRegisterGauge() {
- when(config.isMetricsOn()).thenReturn(true);
- when(config.getTableName()).thenReturn("foo");
- when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
- when(config.getJmxHost()).thenReturn("localhost");
- when(config.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort()));
- new HoodieMetrics(config);
- registerGauge("jmx_metric1", 123L);
- assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+ metrics.registerGauge("jmx_metric1", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges()
.get("jmx_metric1").getValue().toString());
}
@Test
public void testRegisterGaugeByRangerPort() {
- when(config.isMetricsOn()).thenReturn(true);
- when(config.getTableName()).thenReturn("foo");
- when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
- when(config.getJmxHost()).thenReturn("localhost");
- when(config.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort()));
- new HoodieMetrics(config);
- registerGauge("jmx_metric2", 123L);
- assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+ metrics.registerGauge("jmx_metric2", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges()
.get("jmx_metric2").getValue().toString());
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 02b1a285312..1598810ce42 100755
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -32,9 +32,9 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Random;
+import java.util.UUID;
import java.util.stream.Stream;
-import static org.apache.hudi.metrics.Metrics.registerGauge;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -45,25 +45,28 @@ public class TestHoodieMetrics {
@Mock
HoodieWriteConfig config;
- HoodieMetrics metrics;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
@BeforeEach
void setUp() {
when(config.isMetricsOn()).thenReturn(true);
when(config.getTableName()).thenReturn("raw_table");
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
- metrics = new HoodieMetrics(config);
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
}
@AfterEach
void shutdownMetrics() {
- Metrics.shutdown();
+ metrics.shutdown();
}
@Test
public void testRegisterGauge() {
- registerGauge("metric1", 123L);
- assertEquals("123", Metrics.getInstance().getRegistry().getGauges().get("metric1").getValue().toString());
+ metrics.registerGauge("metric1", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
}
@Test
@@ -71,50 +74,50 @@ public class TestHoodieMetrics {
Random rand = new Random();
// Index metrics
- Timer.Context timer = metrics.getIndexCtx();
+ Timer.Context timer = hoodieMetrics.getIndexCtx();
Thread.sleep(5); // Ensure timer duration is > 0
- metrics.updateIndexMetrics("some_action", metrics.getDurationInMs(timer.stop()));
- String metricName = metrics.getMetricsName("index", "some_action.duration");
- long msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+ hoodieMetrics.updateIndexMetrics("some_action", hoodieMetrics.getDurationInMs(timer.stop()));
+ String metricName = hoodieMetrics.getMetricsName("index", "some_action.duration");
+ long msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
assertTrue(msec > 0);
// Rollback metrics
- timer = metrics.getRollbackCtx();
+ timer = hoodieMetrics.getRollbackCtx();
Thread.sleep(5); // Ensure timer duration is > 0
long numFilesDeleted = 1 + rand.nextInt();
- metrics.updateRollbackMetrics(metrics.getDurationInMs(timer.stop()), numFilesDeleted);
- metricName = metrics.getMetricsName("rollback", "duration");
- msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+ hoodieMetrics.updateRollbackMetrics(hoodieMetrics.getDurationInMs(timer.stop()), numFilesDeleted);
+ metricName = hoodieMetrics.getMetricsName("rollback", "duration");
+ msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
assertTrue(msec > 0);
- metricName = metrics.getMetricsName("rollback", "numFilesDeleted");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
+ metricName = hoodieMetrics.getMetricsName("rollback", "numFilesDeleted");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
// Clean metrics
- timer = metrics.getRollbackCtx();
+ timer = hoodieMetrics.getRollbackCtx();
Thread.sleep(5); // Ensure timer duration is > 0
numFilesDeleted = 1 + rand.nextInt();
- metrics.updateCleanMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesDeleted);
- metricName = metrics.getMetricsName("clean", "duration");
- msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+ hoodieMetrics.updateCleanMetrics(hoodieMetrics.getDurationInMs(timer.stop()), (int)numFilesDeleted);
+ metricName = hoodieMetrics.getMetricsName("clean", "duration");
+ msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
assertTrue(msec > 0);
- metricName = metrics.getMetricsName("clean", "numFilesDeleted");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
+ metricName = hoodieMetrics.getMetricsName("clean", "numFilesDeleted");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
// Finalize metrics
- timer = metrics.getFinalizeCtx();
+ timer = hoodieMetrics.getFinalizeCtx();
Thread.sleep(5); // Ensure timer duration is > 0
long numFilesFinalized = 1 + rand.nextInt();
- metrics.updateFinalizeWriteMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesFinalized);
- metricName = metrics.getMetricsName("finalize", "duration");
- msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+ hoodieMetrics.updateFinalizeWriteMetrics(hoodieMetrics.getDurationInMs(timer.stop()), (int)numFilesFinalized);
+ metricName = hoodieMetrics.getMetricsName("finalize", "duration");
+ msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
assertTrue(msec > 0);
- metricName = metrics.getMetricsName("finalize", "numFilesFinalized");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized);
+ metricName = hoodieMetrics.getMetricsName("finalize", "numFilesFinalized");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized);
// Commit / deltacommit / compaction metrics
Stream.of("commit", "deltacommit", "compaction").forEach(action -> {
- Timer.Context commitTimer = action.equals("commit") ? metrics.getCommitCtx() :
- action.equals("deltacommit") ? metrics.getDeltaCommitCtx() : metrics.getCompactionCtx();
+ Timer.Context commitTimer = action.equals("commit") ? hoodieMetrics.getCommitCtx() :
+ action.equals("deltacommit") ? hoodieMetrics.getDeltaCommitCtx() : hoodieMetrics.getCompactionCtx();
try {
// Ensure timer duration is > 0
@@ -139,41 +142,41 @@ public class TestHoodieMetrics {
when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
when(metadata.getTotalRecordsDeleted()).thenReturn(randomValue + 14);
when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), Option.empty()));
- metrics.updateCommitMetrics(randomValue + 15, commitTimer.stop(), metadata, action);
+ hoodieMetrics.updateCommitMetrics(randomValue + 15, commitTimer.stop(), metadata, action);
- String metricname = metrics.getMetricsName(action, "duration");
- long duration = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue();
+ String metricname = hoodieMetrics.getMetricsName(action, "duration");
+ long duration = (Long)metrics.getRegistry().getGauges().get(metricname).getValue();
assertTrue(duration > 0);
- metricname = metrics.getMetricsName(action, "totalPartitionsWritten");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalPartitionsWritten());
- metricname = metrics.getMetricsName(action, "totalFilesInsert");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesInsert());
- metricname = metrics.getMetricsName(action, "totalFilesUpdate");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesUpdated());
- metricname = metrics.getMetricsName(action, "totalRecordsWritten");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalRecordsWritten());
- metricname = metrics.getMetricsName(action, "totalUpdateRecordsWritten");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalUpdateRecordsWritten());
- metricname = metrics.getMetricsName(action, "totalInsertRecordsWritten");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalInsertRecordsWritten());
- metricname = metrics.getMetricsName(action, "totalBytesWritten");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalBytesWritten());
- metricname = metrics.getMetricsName(action, "commitTime");
- assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), randomValue + 15);
- metricname = metrics.getMetricsName(action, "totalScanTime");
- assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalScanTime());
- metricname = metrics.getMetricsName(action, "totalCreateTime");
- assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCreateTime());
- metricname = metrics.getMetricsName(action, "totalUpsertTime");
- assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalUpsertTime());
- metricname = metrics.getMetricsName(action, "totalCompactedRecordsUpdated");
- assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCompactedRecordsUpdated());
- metricname = metrics.getMetricsName(action, "totalLogFilesCompacted");
- assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesCompacted());
- metricname = metrics.getMetricsName(action, "totalLogFilesSize");
- assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesSize());
- metricname = metrics.getMetricsName(action, "totalRecordsDeleted");
- assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRecordsDeleted());
+ metricname = hoodieMetrics.getMetricsName(action, "totalPartitionsWritten");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalPartitionsWritten());
+ metricname = hoodieMetrics.getMetricsName(action, "totalFilesInsert");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesInsert());
+ metricname = hoodieMetrics.getMetricsName(action, "totalFilesUpdate");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesUpdated());
+ metricname = hoodieMetrics.getMetricsName(action, "totalRecordsWritten");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalRecordsWritten());
+ metricname = hoodieMetrics.getMetricsName(action, "totalUpdateRecordsWritten");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalUpdateRecordsWritten());
+ metricname = hoodieMetrics.getMetricsName(action, "totalInsertRecordsWritten");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalInsertRecordsWritten());
+ metricname = hoodieMetrics.getMetricsName(action, "totalBytesWritten");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalBytesWritten());
+ metricname = hoodieMetrics.getMetricsName(action, "commitTime");
+ assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), randomValue + 15);
+ metricname = hoodieMetrics.getMetricsName(action, "totalScanTime");
+ assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalScanTime());
+ metricname = hoodieMetrics.getMetricsName(action, "totalCreateTime");
+ assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCreateTime());
+ metricname = hoodieMetrics.getMetricsName(action, "totalUpsertTime");
+ assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalUpsertTime());
+ metricname = hoodieMetrics.getMetricsName(action, "totalCompactedRecordsUpdated");
+ assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCompactedRecordsUpdated());
+ metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesCompacted");
+ assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesCompacted());
+ metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesSize");
+ assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesSize());
+ metricname = hoodieMetrics.getMetricsName(action, "totalRecordsDeleted");
+ assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRecordsDeleted());
});
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
index 2514a489563..16120fe2f24 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
@@ -19,7 +19,9 @@
package org.apache.hudi.metrics.datadog;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import com.codahale.metrics.MetricRegistry;
@@ -30,6 +32,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Arrays;
+import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -41,44 +44,66 @@ public class TestDatadogMetricsReporter {
@Mock
HoodieWriteConfig config;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
@Mock
MetricRegistry registry;
@AfterEach
void shutdownMetrics() {
- Metrics.shutdown();
+ if (metrics != null) {
+ metrics.shutdown();
+ }
}
@Test
public void instantiationShouldFailWhenNoApiKey() {
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getTableName()).thenReturn("table1");
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.DATADOG);
when(config.getDatadogApiKey()).thenReturn("");
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+
Throwable t = assertThrows(IllegalStateException.class, () -> {
- new DatadogMetricsReporter(config, registry);
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
});
assertEquals("Datadog cannot be initialized: API key is null or empty.", t.getMessage());
}
@Test
public void instantiationShouldFailWhenNoMetricPrefix() {
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getTableName()).thenReturn("table1");
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.DATADOG);
when(config.getDatadogApiKey()).thenReturn("foo");
when(config.getDatadogMetricPrefix()).thenReturn("");
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
Throwable t = assertThrows(IllegalStateException.class, () -> {
- new DatadogMetricsReporter(config, registry);
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
});
assertEquals("Datadog cannot be initialized: Metric prefix is null or empty.", t.getMessage());
}
@Test
public void instantiationShouldSucceed() {
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getTableName()).thenReturn("table1");
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.DATADOG);
when(config.getDatadogApiSite()).thenReturn(ApiSite.EU);
when(config.getDatadogApiKey()).thenReturn("foo");
when(config.getDatadogApiKeySkipValidation()).thenReturn(true);
when(config.getDatadogMetricPrefix()).thenReturn("bar");
when(config.getDatadogMetricHost()).thenReturn("foo");
when(config.getDatadogMetricTags()).thenReturn(Arrays.asList("baz", "foo"));
+ when(config.getDatadogReportPeriodSeconds()).thenReturn(10);
+ when(config.getMetricReporterMetricsNamePrefix()).thenReturn("");
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
assertDoesNotThrow(() -> {
- new DatadogMetricsReporter(config, registry);
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
});
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
index 79b12716530..4e94ece52c9 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
@@ -29,6 +29,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.util.UUID;
+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.when;
@@ -37,10 +39,14 @@ public class TestPrometheusReporter {
@Mock
HoodieWriteConfig config;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
@AfterEach
void shutdownMetrics() {
- Metrics.shutdown();
+ if (metrics != null) {
+ metrics.shutdown();
+ }
}
@Test
@@ -49,8 +55,11 @@ public class TestPrometheusReporter {
when(config.getTableName()).thenReturn("foo");
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS);
when(config.getPrometheusPort()).thenReturn(9090);
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
assertDoesNotThrow(() -> {
new HoodieMetrics(config);
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
});
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
index 568ccabc78d..245a66d83d3 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
@@ -32,7 +32,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Map;
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -44,10 +45,14 @@ public class TestPushGateWayReporter {
@Mock
HoodieWriteConfig config;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
@AfterEach
void shutdownMetrics() {
- Metrics.shutdown();
+ if (metrics != null) {
+ metrics.shutdown();
+ }
}
@Test
@@ -62,13 +67,16 @@ public class TestPushGateWayReporter {
when(config.getPushGatewayJobName()).thenReturn("foo");
when(config.getPushGatewayRandomJobNameSuffix()).thenReturn(false);
when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus");
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
assertDoesNotThrow(() -> {
new HoodieMetrics(config);
});
- registerGauge("pushGateWayReporter_metric", 123L);
- assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+ metrics.registerGauge("pushGateWayReporter_metric", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges()
.get("pushGateWayReporter_metric").getValue().toString());
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f9738dbd3e3..96346652e74 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -828,7 +828,7 @@ object HoodieSparkSqlWriter {
}
def cleanup() : Unit = {
- Metrics.shutdown()
+ Metrics.shutdownAllMetrics()
}
private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index d681cd7a766..a0fc42673b9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -862,7 +862,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
- assertEquals(false, Metrics.isInitialized)
+ assertEquals(false, Metrics.isInitialized(basePath))
}
@ParameterizedTest
@@ -1154,7 +1154,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
.save(basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
- assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown")
+ assertEquals(false, Metrics.isInitialized(basePath), "Metrics should be shutdown")
}
def getWriterReaderOpts(recordType: HoodieRecordType,
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 07ba73b528a..dc780ee9a5d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -1119,6 +1119,11 @@ public class DeltaSync implements Serializable, Closeable {
if (embeddedTimelineService.isPresent()) {
embeddedTimelineService.get().stop();
}
+
+ if (metrics != null) {
+ metrics.shutdown();
+ }
+
}
public FileSystem getFs() {
@@ -1137,6 +1142,10 @@ public class DeltaSync implements Serializable, Closeable {
return commitTimelineOpt;
}
+ public HoodieDeltaStreamerMetrics getMetrics() {
+ return metrics;
+ }
+
/**
* Schedule clustering.
* Called from {@link HoodieDeltaStreamer} when async clustering is enabled.
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 3211f5127ff..50fb75b4902 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -53,7 +53,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
-import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
@@ -217,7 +216,6 @@ public class HoodieDeltaStreamer implements Serializable {
throw ex;
} finally {
deltaSyncService.ifPresent(DeltaSyncService::close);
- Metrics.shutdown();
LOG.info("Shut down delta streamer");
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
index 2475e92f960..f76a881b7ec 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
@@ -36,12 +36,13 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
private transient Timer overallTimer = null;
public transient Timer hiveSyncTimer = null;
public transient Timer metaSyncTimer = null;
+ private Metrics metrics;
public HoodieDeltaStreamerMetrics(HoodieWriteConfig config) {
this.config = config;
this.tableName = config.getTableName();
if (config.isMetricsOn()) {
- Metrics.init(config);
+ this.metrics = Metrics.getInstance(config);
this.overallTimerName = getMetricsName("timer", "deltastreamer");
this.hiveSyncTimerName = getMetricsName("timer", "deltastreamerHiveSync");
this.metaSyncTimerName = getMetricsName("timer", "deltastreamerMetaSync");
@@ -70,7 +71,7 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
}
private Timer createTimer(String name) {
- return config.isMetricsOn() ? Metrics.getInstance().getRegistry().timer(name) : null;
+ return config.isMetricsOn() ? metrics.getRegistry().timer(name) : null;
}
String getMetricsName(String action, String metric) {
@@ -79,31 +80,73 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
public void updateDeltaStreamerMetrics(long durationInNs) {
if (config.isMetricsOn()) {
- Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
+ metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
}
}
public void updateDeltaStreamerMetaSyncMetrics(String syncClassShortName, long syncNs) {
if (config.isMetricsOn()) {
- Metrics.registerGauge(getMetricsName("deltastreamer", syncClassShortName), getDurationInMs(syncNs));
+ metrics.registerGauge(getMetricsName("deltastreamer", syncClassShortName), getDurationInMs(syncNs));
}
}
public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) {
if (config.isMetricsOn()) {
- Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount);
+ metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount);
}
}
public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) {
if (config.isMetricsOn()) {
- Metrics.registerGauge(getMetricsName("deltastreamer", "lastSync"), syncEpochTimeInMs);
+ metrics.registerGauge(getMetricsName("deltastreamer", "lastSync"), syncEpochTimeInMs);
}
}
public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) {
if (config.isMetricsOn()) {
- Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount);
+ metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount);
+ }
+ }
+
+ public void updateNumSuccessfulSyncs(long numSuccessfulSyncs) {
+ if (config.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "numSuccessfulSyncs"), numSuccessfulSyncs);
+ }
+ }
+
+ public void updateNumFailedSyncs(long numFailedSyncs) {
+ if (config.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "numFailedSyncs"), numFailedSyncs);
+ }
+ }
+
+ public void updateNumConsecutiveFailures(int numConsecutiveFailures) {
+ if (config.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "numConsecutiveFailures"), numConsecutiveFailures);
+ }
+ }
+
+ public void updateTotalSourceBytesAvailableForIngest(long totalSourceBytesAvailable) {
+ if (config.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "totalSourceBytesAvailable"), totalSourceBytesAvailable);
+ }
+ }
+
+ public void updateTotalSyncDurationMs(long totalSyncDurationMs) {
+ if (config.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "totalSyncDurationMs"), totalSyncDurationMs);
+ }
+ }
+
+ public void updateActualSyncDurationMs(long actualSyncDurationMs) {
+ if (config.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "actualSyncDurationMs"), actualSyncDurationMs);
+ }
+ }
+
+ public void shutdown() {
+ if (metrics != null) {
+ metrics.shutdown();
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index eb6a3d3a7ce..61d03bc0b58 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -817,7 +817,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
- assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
+ assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown");
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@@ -847,7 +847,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
- assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
+ assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown");
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}