You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/28 18:01:23 UTC

[hudi] branch master updated: [HUDI-5777] Support Metrics for Multiple Tables Simultaneously (#7934)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bb240f7beaf [HUDI-5777] Support Metrics for Multiple Tables Simultaneously (#7934)
bb240f7beaf is described below

commit bb240f7beaf48ce3062e29378928bd05c74e5600
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Tue Feb 28 13:01:11 2023 -0500

    [HUDI-5777] Support Metrics for Multiple Tables Simultaneously (#7934)
    
    - Allows usage of metrics for multiple tables. This is useful for RFC-20 as well as HoodieMultiTableDeltaStreamer.
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../lock/metrics/HoodieLockMetrics.java            |   4 +-
 .../org/apache/hudi/metrics/HoodieMetrics.java     |  90 +++++++-------
 .../main/java/org/apache/hudi/metrics/Metrics.java |  92 +++++++--------
 .../hudi/metrics/TestHoodieConsoleMetrics.java     |  15 ++-
 .../hudi/metrics/TestHoodieGraphiteMetrics.java    |  15 ++-
 .../apache/hudi/metrics/TestHoodieJmxMetrics.java  |  40 ++++---
 .../org/apache/hudi/metrics/TestHoodieMetrics.java | 129 +++++++++++----------
 .../datadog/TestDatadogMetricsReporter.java        |  33 +++++-
 .../metrics/prometheus/TestPrometheusReporter.java |  11 +-
 .../prometheus/TestPushGateWayReporter.java        |  16 ++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   4 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   9 ++
 .../deltastreamer/HoodieDeltaStreamer.java         |   2 -
 .../deltastreamer/HoodieDeltaStreamerMetrics.java  |  57 +++++++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   4 +-
 16 files changed, 314 insertions(+), 209 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
index c33a86bfbe7..0fb5fd1caa3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
@@ -47,13 +47,15 @@ public class HoodieLockMetrics {
   private transient Timer lockDuration;
   private transient Timer lockApiRequestDuration;
   private static final Object REGISTRY_LOCK = new Object();
+  private Metrics metrics;
 
   public HoodieLockMetrics(HoodieWriteConfig writeConfig) {
     this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled();
     this.writeConfig = writeConfig;
 
     if (isMetricsEnabled) {
-      MetricRegistry registry = Metrics.getInstance().getRegistry();
+      metrics = Metrics.getInstance(writeConfig);
+      MetricRegistry registry = metrics.getRegistry();
 
       lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME));
       successfulLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_SUCCESS_COUNTER_NAME));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 7b34bb48176..7737b78b193 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -35,6 +35,8 @@ import org.apache.log4j.Logger;
 public class HoodieMetrics {
 
   private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class);
+
+  private Metrics metrics;
   // Some timers
   public String rollbackTimerName = null;
   public String cleanTimerName = null;
@@ -67,7 +69,7 @@ public class HoodieMetrics {
     this.config = config;
     this.tableName = config.getTableName();
     if (config.isMetricsOn()) {
-      Metrics.init(config);
+      metrics = Metrics.getInstance(config);
       this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
       this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
       this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
@@ -84,7 +86,11 @@ public class HoodieMetrics {
   }
 
   private Timer createTimer(String name) {
-    return config.isMetricsOn() ? Metrics.getInstance().getRegistry().timer(name) : null;
+    return config.isMetricsOn() ? metrics.getRegistry().timer(name) : null;
+  }
+
+  public Metrics getMetrics() {
+    return metrics;
   }
 
   public Timer.Context getRollbackCtx() {
@@ -162,20 +168,20 @@ public class HoodieMetrics {
       // No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
       return;
     }
-    Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
-    Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
+    metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
   }
 
   public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
@@ -196,20 +202,20 @@ public class HoodieMetrics {
       long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
       long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
       long totalLogFilesSize = metadata.getTotalLogFilesSize();
-      Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
-      Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
-      Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
-      Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
-      Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
-      Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
-      Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
-      Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
-      Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
-      Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
-      Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
-      Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
-      Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
-      Metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), totalRecordsDeleted);
+      metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
+      metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
+      metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
+      metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
+      metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
+      metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
+      metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
+      metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
+      metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
+      metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
+      metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
+      metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
+      metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
+      metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), totalRecordsDeleted);
     }
   }
 
@@ -219,14 +225,14 @@ public class HoodieMetrics {
       Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
       if (eventTimePairMinMax.getLeft().isPresent()) {
         long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
-        Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
+        metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
       }
       if (eventTimePairMinMax.getRight().isPresent()) {
         long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
-        Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
+        metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
       }
-      Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
-      Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
+      metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
+      metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
     }
   }
 
@@ -234,8 +240,8 @@ public class HoodieMetrics {
     if (config.isMetricsOn()) {
       LOG.info(
           String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
-      Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
-      Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
+      metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
+      metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
     }
   }
 
@@ -243,8 +249,8 @@ public class HoodieMetrics {
     if (config.isMetricsOn()) {
       LOG.info(
           String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
-      Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
-      Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
+      metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
+      metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
     }
   }
 
@@ -252,15 +258,15 @@ public class HoodieMetrics {
     if (config.isMetricsOn()) {
       LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
           numFilesFinalized));
-      Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
-      Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
+      metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
+      metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
     }
   }
 
   public void updateIndexMetrics(final String action, final long durationInMs) {
     if (config.isMetricsOn()) {
       LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
-      Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
+      metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
     }
   }
 
@@ -293,7 +299,7 @@ public class HoodieMetrics {
 
   private Counter getCounter(Counter counter, String name) {
     if (counter == null) {
-      return Metrics.getInstance().getRegistry().counter(name);
+      return metrics.getRegistry().counter(name);
     }
     return counter;
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index 8f3e4974812..f999f185bc8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -21,12 +21,12 @@ package org.apache.hudi.metrics;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
 
 import com.codahale.metrics.MetricRegistry;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -36,14 +36,14 @@ public class Metrics {
 
   private static final Logger LOG = LogManager.getLogger(Metrics.class);
 
-  private static volatile boolean initialized = false;
-  private static Metrics instance = null;
+  private static final Map<String, Metrics> METRICS_INSTANCE_PER_BASEPATH = new HashMap<>();
 
   private final MetricRegistry registry;
   private MetricsReporter reporter;
   private final String commonMetricPrefix;
+  private boolean initialized = false;
 
-  private Metrics(HoodieWriteConfig metricConfig) {
+  public Metrics(HoodieWriteConfig metricConfig) {
     registry = new MetricRegistry();
     commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix();
     reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
@@ -52,10 +52,30 @@ public class Metrics {
     }
     reporter.start();
 
-    Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
+    Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
+    this.initialized = true;
   }
 
-  private void reportAndStopReporter() {
+  private void registerHoodieCommonMetrics() {
+    registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
+  }
+
+  public static synchronized Metrics getInstance(HoodieWriteConfig metricConfig) {
+    String basePath = metricConfig.getBasePath();
+    if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
+      return METRICS_INSTANCE_PER_BASEPATH.get(basePath);
+    }
+
+    Metrics metrics = new Metrics(metricConfig);
+    METRICS_INSTANCE_PER_BASEPATH.put(basePath, metrics);
+    return metrics;
+  }
+
+  public static synchronized void shutdownAllMetrics() {
+    METRICS_INSTANCE_PER_BASEPATH.values().forEach(Metrics::shutdown);
+  }
+
+  public synchronized void shutdown() {
     try {
       registerHoodieCommonMetrics();
       reporter.report();
@@ -63,64 +83,29 @@ public class Metrics {
       reporter.stop();
     } catch (Exception e) {
       LOG.warn("Error while closing reporter", e);
+    } finally {
+      initialized = false;
     }
   }
 
-  private void reportAndFlushMetrics() {
+  public synchronized void flush() {
     try {
       LOG.info("Reporting and flushing all metrics");
-      this.registerHoodieCommonMetrics();
-      this.reporter.report();
-      this.registry.getNames().forEach(this.registry::remove);
+      registerHoodieCommonMetrics();
+      reporter.report();
+      registry.getNames().forEach(this.registry::remove);
     } catch (Exception e) {
       LOG.error("Error while reporting and flushing metrics", e);
     }
   }
-
-  private void registerHoodieCommonMetrics() {
-    registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
-  }
-
-  public static Metrics getInstance() {
-    assert initialized;
-    return instance;
-  }
-
-  public static synchronized void init(HoodieWriteConfig metricConfig) {
-    if (initialized) {
-      return;
-    }
-    try {
-      instance = new Metrics(metricConfig);
-    } catch (Exception e) {
-      throw new HoodieException(e);
-    }
-    initialized = true;
-  }
-
-  public static synchronized void shutdown() {
-    if (!initialized) {
-      return;
-    }
-    instance.reportAndStopReporter();
-    initialized = false;
-  }
-
-  public static synchronized void flush() {
-    if (!Metrics.initialized) {
-      return;
-    }
-    instance.reportAndFlushMetrics();
-  }
-
-  public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
+  
+  public void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
     String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
     metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
   }
 
-  public static void registerGauge(String metricName, final long value) {
+  public void registerGauge(String metricName, final long value) {
     try {
-      MetricRegistry registry = Metrics.getInstance().getRegistry();
       HoodieGauge guage = (HoodieGauge) registry.gauge(metricName, () -> new HoodieGauge<>(value));
       guage.setValue(value);
     } catch (Exception e) {
@@ -134,7 +119,10 @@ public class Metrics {
     return registry;
   }
 
-  public static boolean isInitialized() {
-    return initialized;
+  public static boolean isInitialized(String basePath) {
+    if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
+      return METRICS_INSTANCE_PER_BASEPATH.get(basePath).initialized;
+    }
+    return false;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
index 467a9f79293..4a0de10512e 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
@@ -26,7 +26,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.when;
 
@@ -35,23 +36,27 @@ public class TestHoodieConsoleMetrics {
 
   @Mock
   HoodieWriteConfig config;
+  HoodieMetrics hoodieMetrics;
+  Metrics metrics;
 
   @BeforeEach
   public void start() {
     when(config.getTableName()).thenReturn("console_metrics_test");
     when(config.isMetricsOn()).thenReturn(true);
     when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE);
-    new HoodieMetrics(config);
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+    hoodieMetrics = new HoodieMetrics(config);
+    metrics = hoodieMetrics.getMetrics();
   }
 
   @AfterEach
   public void stop() {
-    Metrics.shutdown();
+    metrics.shutdown();
   }
 
   @Test
   public void testRegisterGauge() {
-    registerGauge("metric1", 123L);
-    assertEquals("123", Metrics.getInstance().getRegistry().getGauges().get("metric1").getValue().toString());
+    metrics.registerGauge("metric1", 123L);
+    assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
index 6ff7ee88ac8..dc1d0ae0cf5 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
@@ -26,7 +26,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.when;
 
@@ -38,10 +39,12 @@ public class TestHoodieGraphiteMetrics {
 
   @Mock
   HoodieWriteConfig config;
+  HoodieMetrics hoodieMetrics;
+  Metrics metrics;
 
   @AfterEach
   void shutdownMetrics() {
-    Metrics.shutdown();
+    metrics.shutdown();
   }
 
   @Test
@@ -52,9 +55,11 @@ public class TestHoodieGraphiteMetrics {
     when(config.getGraphiteServerHost()).thenReturn("localhost");
     when(config.getGraphiteServerPort()).thenReturn(NetworkTestUtils.nextFreePort());
     when(config.getGraphiteReportPeriodSeconds()).thenReturn(30);
-    new HoodieMetrics(config);
-    registerGauge("graphite_metric", 123L);
-    assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+    hoodieMetrics = new HoodieMetrics(config);
+    metrics = hoodieMetrics.getMetrics();
+    metrics.registerGauge("graphite_metric", 123L);
+    assertEquals("123", metrics.getRegistry().getGauges()
                             .get("graphite_metric").getValue().toString());
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
index a752aa36eca..a2ec03263a7 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
@@ -22,12 +22,14 @@ import org.apache.hudi.common.testutils.NetworkTestUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.when;
 
@@ -39,35 +41,37 @@ public class TestHoodieJmxMetrics {
 
   @Mock
   HoodieWriteConfig config;
+  HoodieMetrics hoodieMetrics;
+  Metrics metrics;
+
+  @BeforeEach
+  void setup() {
+    when(config.isMetricsOn()).thenReturn(true);
+    when(config.getTableName()).thenReturn("foo");
+    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+    when(config.getJmxHost()).thenReturn("localhost");
+    when(config.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort()));
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+    hoodieMetrics = new HoodieMetrics(config);
+    metrics = hoodieMetrics.getMetrics();
+  }
 
   @AfterEach
   void shutdownMetrics() {
-    Metrics.shutdown();
+    metrics.shutdown();
   }
 
   @Test
   public void testRegisterGauge() {
-    when(config.isMetricsOn()).thenReturn(true);
-    when(config.getTableName()).thenReturn("foo");
-    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
-    when(config.getJmxHost()).thenReturn("localhost");
-    when(config.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort()));
-    new HoodieMetrics(config);
-    registerGauge("jmx_metric1", 123L);
-    assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+    metrics.registerGauge("jmx_metric1", 123L);
+    assertEquals("123", metrics.getRegistry().getGauges()
         .get("jmx_metric1").getValue().toString());
   }
 
   @Test
   public void testRegisterGaugeByRangerPort() {
-    when(config.isMetricsOn()).thenReturn(true);
-    when(config.getTableName()).thenReturn("foo");
-    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
-    when(config.getJmxHost()).thenReturn("localhost");
-    when(config.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort()));
-    new HoodieMetrics(config);
-    registerGauge("jmx_metric2", 123L);
-    assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+    metrics.registerGauge("jmx_metric2", 123L);
+    assertEquals("123", metrics.getRegistry().getGauges()
         .get("jmx_metric2").getValue().toString());
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 02b1a285312..1598810ce42 100755
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -32,9 +32,9 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Random;
+import java.util.UUID;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.metrics.Metrics.registerGauge;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -45,25 +45,28 @@ public class TestHoodieMetrics {
 
   @Mock
   HoodieWriteConfig config;
-  HoodieMetrics metrics;
+  HoodieMetrics hoodieMetrics;
+  Metrics metrics;
 
   @BeforeEach
   void setUp() {
     when(config.isMetricsOn()).thenReturn(true);
     when(config.getTableName()).thenReturn("raw_table");
     when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
-    metrics = new HoodieMetrics(config);
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+    hoodieMetrics = new HoodieMetrics(config);
+    metrics = hoodieMetrics.getMetrics();
   }
 
   @AfterEach
   void shutdownMetrics() {
-    Metrics.shutdown();
+    metrics.shutdown();
   }
 
   @Test
   public void testRegisterGauge() {
-    registerGauge("metric1", 123L);
-    assertEquals("123", Metrics.getInstance().getRegistry().getGauges().get("metric1").getValue().toString());
+    metrics.registerGauge("metric1", 123L);
+    assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
   }
 
   @Test
@@ -71,50 +74,50 @@ public class TestHoodieMetrics {
     Random rand = new Random();
 
     // Index metrics
-    Timer.Context timer = metrics.getIndexCtx();
+    Timer.Context timer = hoodieMetrics.getIndexCtx();
     Thread.sleep(5); // Ensure timer duration is > 0
-    metrics.updateIndexMetrics("some_action", metrics.getDurationInMs(timer.stop()));
-    String metricName = metrics.getMetricsName("index", "some_action.duration");
-    long msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+    hoodieMetrics.updateIndexMetrics("some_action", hoodieMetrics.getDurationInMs(timer.stop()));
+    String metricName = hoodieMetrics.getMetricsName("index", "some_action.duration");
+    long msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
     assertTrue(msec > 0);
 
     // Rollback metrics
-    timer = metrics.getRollbackCtx();
+    timer = hoodieMetrics.getRollbackCtx();
     Thread.sleep(5); // Ensure timer duration is > 0
     long numFilesDeleted = 1 + rand.nextInt();
-    metrics.updateRollbackMetrics(metrics.getDurationInMs(timer.stop()), numFilesDeleted);
-    metricName = metrics.getMetricsName("rollback", "duration");
-    msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+    hoodieMetrics.updateRollbackMetrics(hoodieMetrics.getDurationInMs(timer.stop()), numFilesDeleted);
+    metricName = hoodieMetrics.getMetricsName("rollback", "duration");
+    msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
     assertTrue(msec > 0);
-    metricName = metrics.getMetricsName("rollback", "numFilesDeleted");
-    assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
+    metricName = hoodieMetrics.getMetricsName("rollback", "numFilesDeleted");
+    assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
 
     // Clean metrics
-    timer = metrics.getRollbackCtx();
+    timer = hoodieMetrics.getRollbackCtx();
     Thread.sleep(5); // Ensure timer duration is > 0
     numFilesDeleted = 1 + rand.nextInt();
-    metrics.updateCleanMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesDeleted);
-    metricName = metrics.getMetricsName("clean", "duration");
-    msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+    hoodieMetrics.updateCleanMetrics(hoodieMetrics.getDurationInMs(timer.stop()), (int)numFilesDeleted);
+    metricName = hoodieMetrics.getMetricsName("clean", "duration");
+    msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
     assertTrue(msec > 0);
-    metricName = metrics.getMetricsName("clean", "numFilesDeleted");
-    assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
+    metricName = hoodieMetrics.getMetricsName("clean", "numFilesDeleted");
+    assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
 
     // Finalize metrics
-    timer = metrics.getFinalizeCtx();
+    timer = hoodieMetrics.getFinalizeCtx();
     Thread.sleep(5); // Ensure timer duration is > 0
     long numFilesFinalized = 1 + rand.nextInt();
-    metrics.updateFinalizeWriteMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesFinalized);
-    metricName = metrics.getMetricsName("finalize", "duration");
-    msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
+    hoodieMetrics.updateFinalizeWriteMetrics(hoodieMetrics.getDurationInMs(timer.stop()), (int)numFilesFinalized);
+    metricName = hoodieMetrics.getMetricsName("finalize", "duration");
+    msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
     assertTrue(msec > 0);
-    metricName = metrics.getMetricsName("finalize", "numFilesFinalized");
-    assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized);
+    metricName = hoodieMetrics.getMetricsName("finalize", "numFilesFinalized");
+    assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized);
 
     // Commit / deltacommit / compaction metrics
     Stream.of("commit", "deltacommit", "compaction").forEach(action -> {
-      Timer.Context commitTimer = action.equals("commit") ? metrics.getCommitCtx() :
-          action.equals("deltacommit") ? metrics.getDeltaCommitCtx() : metrics.getCompactionCtx();
+      Timer.Context commitTimer = action.equals("commit") ? hoodieMetrics.getCommitCtx() :
+          action.equals("deltacommit") ? hoodieMetrics.getDeltaCommitCtx() : hoodieMetrics.getCompactionCtx();
 
       try {
         // Ensure timer duration is > 0
@@ -139,41 +142,41 @@ public class TestHoodieMetrics {
       when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
       when(metadata.getTotalRecordsDeleted()).thenReturn(randomValue + 14);
       when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), Option.empty()));
-      metrics.updateCommitMetrics(randomValue + 15, commitTimer.stop(), metadata, action);
+      hoodieMetrics.updateCommitMetrics(randomValue + 15, commitTimer.stop(), metadata, action);
 
-      String metricname = metrics.getMetricsName(action, "duration");
-      long duration = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue();
+      String metricname = hoodieMetrics.getMetricsName(action, "duration");
+      long duration = (Long)metrics.getRegistry().getGauges().get(metricname).getValue();
       assertTrue(duration > 0);
-      metricname = metrics.getMetricsName(action, "totalPartitionsWritten");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalPartitionsWritten());
-      metricname = metrics.getMetricsName(action, "totalFilesInsert");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesInsert());
-      metricname = metrics.getMetricsName(action, "totalFilesUpdate");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesUpdated());
-      metricname = metrics.getMetricsName(action, "totalRecordsWritten");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalRecordsWritten());
-      metricname = metrics.getMetricsName(action, "totalUpdateRecordsWritten");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalUpdateRecordsWritten());
-      metricname = metrics.getMetricsName(action, "totalInsertRecordsWritten");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalInsertRecordsWritten());
-      metricname = metrics.getMetricsName(action, "totalBytesWritten");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalBytesWritten());
-      metricname = metrics.getMetricsName(action, "commitTime");
-      assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), randomValue + 15);
-      metricname = metrics.getMetricsName(action, "totalScanTime");
-      assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalScanTime());
-      metricname = metrics.getMetricsName(action, "totalCreateTime");
-      assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCreateTime());
-      metricname = metrics.getMetricsName(action, "totalUpsertTime");
-      assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalUpsertTime());
-      metricname = metrics.getMetricsName(action, "totalCompactedRecordsUpdated");
-      assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCompactedRecordsUpdated());
-      metricname = metrics.getMetricsName(action, "totalLogFilesCompacted");
-      assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesCompacted());
-      metricname = metrics.getMetricsName(action, "totalLogFilesSize");
-      assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesSize());
-      metricname = metrics.getMetricsName(action, "totalRecordsDeleted");
-      assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRecordsDeleted());
+      metricname = hoodieMetrics.getMetricsName(action, "totalPartitionsWritten");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalPartitionsWritten());
+      metricname = hoodieMetrics.getMetricsName(action, "totalFilesInsert");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesInsert());
+      metricname = hoodieMetrics.getMetricsName(action, "totalFilesUpdate");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesUpdated());
+      metricname = hoodieMetrics.getMetricsName(action, "totalRecordsWritten");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalRecordsWritten());
+      metricname = hoodieMetrics.getMetricsName(action, "totalUpdateRecordsWritten");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalUpdateRecordsWritten());
+      metricname = hoodieMetrics.getMetricsName(action, "totalInsertRecordsWritten");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalInsertRecordsWritten());
+      metricname = hoodieMetrics.getMetricsName(action, "totalBytesWritten");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalBytesWritten());
+      metricname = hoodieMetrics.getMetricsName(action, "commitTime");
+      assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), randomValue + 15);
+      metricname = hoodieMetrics.getMetricsName(action, "totalScanTime");
+      assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalScanTime());
+      metricname = hoodieMetrics.getMetricsName(action, "totalCreateTime");
+      assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCreateTime());
+      metricname = hoodieMetrics.getMetricsName(action, "totalUpsertTime");
+      assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalUpsertTime());
+      metricname = hoodieMetrics.getMetricsName(action, "totalCompactedRecordsUpdated");
+      assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCompactedRecordsUpdated());
+      metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesCompacted");
+      assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesCompacted());
+      metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesSize");
+      assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesSize());
+      metricname = hoodieMetrics.getMetricsName(action, "totalRecordsDeleted");
+      assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRecordsDeleted());
     });
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
index 2514a489563..16120fe2f24 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
@@ -19,7 +19,9 @@
 package org.apache.hudi.metrics.datadog;
 
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
 
 import com.codahale.metrics.MetricRegistry;
@@ -30,6 +32,7 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Arrays;
+import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -41,44 +44,66 @@ public class TestDatadogMetricsReporter {
 
   @Mock
   HoodieWriteConfig config;
+  HoodieMetrics hoodieMetrics;
+  Metrics metrics;
 
   @Mock
   MetricRegistry registry;
 
   @AfterEach
   void shutdownMetrics() {
-    Metrics.shutdown();
+    if (metrics != null) {
+      metrics.shutdown();
+    }
   }
 
   @Test
   public void instantiationShouldFailWhenNoApiKey() {
+    when(config.isMetricsOn()).thenReturn(true);
+    when(config.getTableName()).thenReturn("table1");
+    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.DATADOG);
     when(config.getDatadogApiKey()).thenReturn("");
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+
     Throwable t = assertThrows(IllegalStateException.class, () -> {
-      new DatadogMetricsReporter(config, registry);
+      hoodieMetrics = new HoodieMetrics(config);
+      metrics = hoodieMetrics.getMetrics();
     });
     assertEquals("Datadog cannot be initialized: API key is null or empty.", t.getMessage());
   }
 
   @Test
   public void instantiationShouldFailWhenNoMetricPrefix() {
+    when(config.isMetricsOn()).thenReturn(true);
+    when(config.getTableName()).thenReturn("table1");
+    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.DATADOG);
     when(config.getDatadogApiKey()).thenReturn("foo");
     when(config.getDatadogMetricPrefix()).thenReturn("");
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
     Throwable t = assertThrows(IllegalStateException.class, () -> {
-      new DatadogMetricsReporter(config, registry);
+      hoodieMetrics = new HoodieMetrics(config);
+      metrics = hoodieMetrics.getMetrics();
     });
     assertEquals("Datadog cannot be initialized: Metric prefix is null or empty.", t.getMessage());
   }
 
   @Test
   public void instantiationShouldSucceed() {
+    when(config.isMetricsOn()).thenReturn(true);
+    when(config.getTableName()).thenReturn("table1");
+    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.DATADOG);
     when(config.getDatadogApiSite()).thenReturn(ApiSite.EU);
     when(config.getDatadogApiKey()).thenReturn("foo");
     when(config.getDatadogApiKeySkipValidation()).thenReturn(true);
     when(config.getDatadogMetricPrefix()).thenReturn("bar");
     when(config.getDatadogMetricHost()).thenReturn("foo");
     when(config.getDatadogMetricTags()).thenReturn(Arrays.asList("baz", "foo"));
+    when(config.getDatadogReportPeriodSeconds()).thenReturn(10);
+    when(config.getMetricReporterMetricsNamePrefix()).thenReturn("");
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
     assertDoesNotThrow(() -> {
-      new DatadogMetricsReporter(config, registry);
+      hoodieMetrics = new HoodieMetrics(config);
+      metrics = hoodieMetrics.getMetrics();
     });
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
index 79b12716530..4e94ece52c9 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
@@ -29,6 +29,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.util.UUID;
+
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.Mockito.when;
 
@@ -37,10 +39,14 @@ public class TestPrometheusReporter {
 
   @Mock
   HoodieWriteConfig config;
+  HoodieMetrics hoodieMetrics;
+  Metrics metrics;
 
   @AfterEach
   void shutdownMetrics() {
-    Metrics.shutdown();
+    if (metrics != null) {
+      metrics.shutdown();
+    }
   }
 
   @Test
@@ -49,8 +55,11 @@ public class TestPrometheusReporter {
     when(config.getTableName()).thenReturn("foo");
     when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS);
     when(config.getPrometheusPort()).thenReturn(9090);
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
     assertDoesNotThrow(() -> {
       new HoodieMetrics(config);
+      hoodieMetrics = new HoodieMetrics(config);
+      metrics = hoodieMetrics.getMetrics();
     });
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
index 568ccabc78d..245a66d83d3 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
@@ -32,7 +32,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Map;
 
-import static org.apache.hudi.metrics.Metrics.registerGauge;
+import java.util.UUID;
+
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -44,10 +45,14 @@ public class TestPushGateWayReporter {
 
   @Mock
   HoodieWriteConfig config;
+  HoodieMetrics hoodieMetrics;
+  Metrics metrics;
 
   @AfterEach
   void shutdownMetrics() {
-    Metrics.shutdown();
+    if (metrics != null) {
+      metrics.shutdown();
+    }
   }
 
   @Test
@@ -62,13 +67,16 @@ public class TestPushGateWayReporter {
     when(config.getPushGatewayJobName()).thenReturn("foo");
     when(config.getPushGatewayRandomJobNameSuffix()).thenReturn(false);
     when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus");
+    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+    hoodieMetrics = new HoodieMetrics(config);
+    metrics = hoodieMetrics.getMetrics();
 
     assertDoesNotThrow(() -> {
       new HoodieMetrics(config);
     });
 
-    registerGauge("pushGateWayReporter_metric", 123L);
-    assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+    metrics.registerGauge("pushGateWayReporter_metric", 123L);
+    assertEquals("123", metrics.getRegistry().getGauges()
         .get("pushGateWayReporter_metric").getValue().toString());
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f9738dbd3e3..96346652e74 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -828,7 +828,7 @@ object HoodieSparkSqlWriter {
   }
 
   def cleanup() : Unit = {
-    Metrics.shutdown()
+    Metrics.shutdownAllMetrics()
   }
 
   private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index d681cd7a766..a0fc42673b9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -862,7 +862,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
       .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
       .load(basePath)
     assertEquals(N + 1, hoodieIncViewDF1.count())
-    assertEquals(false, Metrics.isInitialized)
+    assertEquals(false, Metrics.isInitialized(basePath))
   }
 
   @ParameterizedTest
@@ -1154,7 +1154,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
       .save(basePath)
 
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
-    assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown")
+    assertEquals(false, Metrics.isInitialized(basePath), "Metrics should be shutdown")
   }
 
   def getWriterReaderOpts(recordType: HoodieRecordType,
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 07ba73b528a..dc780ee9a5d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -1119,6 +1119,11 @@ public class DeltaSync implements Serializable, Closeable {
     if (embeddedTimelineService.isPresent()) {
       embeddedTimelineService.get().stop();
     }
+
+    if (metrics != null) {
+      metrics.shutdown();
+    }
+
   }
 
   public FileSystem getFs() {
@@ -1137,6 +1142,10 @@ public class DeltaSync implements Serializable, Closeable {
     return commitTimelineOpt;
   }
 
+  public HoodieDeltaStreamerMetrics getMetrics() {
+    return metrics;
+  }
+
   /**
    * Schedule clustering.
    * Called from {@link HoodieDeltaStreamer} when async clustering is enabled.
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 3211f5127ff..50fb75b4902 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -53,7 +53,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hive.HiveSyncTool;
-import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.utilities.HiveIncrementalPuller;
 import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.utilities.UtilHelpers;
@@ -217,7 +216,6 @@ public class HoodieDeltaStreamer implements Serializable {
           throw ex;
         } finally {
           deltaSyncService.ifPresent(DeltaSyncService::close);
-          Metrics.shutdown();
           LOG.info("Shut down delta streamer");
         }
       }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
index 2475e92f960..f76a881b7ec 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
@@ -36,12 +36,13 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
   private transient Timer overallTimer = null;
   public transient Timer hiveSyncTimer = null;
   public transient Timer metaSyncTimer = null;
+  private Metrics metrics;
 
   public HoodieDeltaStreamerMetrics(HoodieWriteConfig config) {
     this.config = config;
     this.tableName = config.getTableName();
     if (config.isMetricsOn()) {
-      Metrics.init(config);
+      this.metrics =  Metrics.getInstance(config);
       this.overallTimerName = getMetricsName("timer", "deltastreamer");
       this.hiveSyncTimerName = getMetricsName("timer", "deltastreamerHiveSync");
       this.metaSyncTimerName = getMetricsName("timer", "deltastreamerMetaSync");
@@ -70,7 +71,7 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
   }
 
   private Timer createTimer(String name) {
-    return config.isMetricsOn() ? Metrics.getInstance().getRegistry().timer(name) : null;
+    return config.isMetricsOn() ? metrics.getRegistry().timer(name) : null;
   }
 
   String getMetricsName(String action, String metric) {
@@ -79,31 +80,73 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
 
   public void updateDeltaStreamerMetrics(long durationInNs) {
     if (config.isMetricsOn()) {
-      Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
+      metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
     }
   }
 
   public void updateDeltaStreamerMetaSyncMetrics(String syncClassShortName, long syncNs) {
     if (config.isMetricsOn()) {
-      Metrics.registerGauge(getMetricsName("deltastreamer", syncClassShortName), getDurationInMs(syncNs));
+      metrics.registerGauge(getMetricsName("deltastreamer", syncClassShortName), getDurationInMs(syncNs));
     }
   }
 
   public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) {
     if (config.isMetricsOn()) {
-      Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount);
+      metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount);
     }
   }
 
   public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) {
     if (config.isMetricsOn()) {
-      Metrics.registerGauge(getMetricsName("deltastreamer", "lastSync"), syncEpochTimeInMs);
+      metrics.registerGauge(getMetricsName("deltastreamer", "lastSync"), syncEpochTimeInMs);
     }
   }
 
   public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) {
     if (config.isMetricsOn()) {
-      Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount);
+      metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount);
+    }
+  }
+
+  public void updateNumSuccessfulSyncs(long numSuccessfulSyncs) {
+    if (config.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "numSuccessfulSyncs"), numSuccessfulSyncs);
+    }
+  }
+
+  public void updateNumFailedSyncs(long numFailedSyncs) {
+    if (config.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "numFailedSyncs"), numFailedSyncs);
+    }
+  }
+
+  public void updateNumConsecutiveFailures(int numConsecutiveFailures) {
+    if (config.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "numConsecutiveFailures"), numConsecutiveFailures);
+    }
+  }
+
+  public void updateTotalSourceBytesAvailableForIngest(long totalSourceBytesAvailable) {
+    if (config.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "totalSourceBytesAvailable"), totalSourceBytesAvailable);
+    }
+  }
+
+  public void updateTotalSyncDurationMs(long totalSyncDurationMs) {
+    if (config.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "totalSyncDurationMs"), totalSyncDurationMs);
+    }
+  }
+
+  public void updateActualSyncDurationMs(long actualSyncDurationMs) {
+    if (config.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "actualSyncDurationMs"), actualSyncDurationMs);
+    }
+  }
+
+  public void shutdown() {
+    if (metrics != null) {
+      metrics.shutdown();
     }
   }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index eb6a3d3a7ce..61d03bc0b58 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -817,7 +817,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
     TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
-    assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
+    assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown");
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -847,7 +847,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
     TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
-    assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
+    assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown");
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }