You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/03 15:36:51 UTC

[hudi] branch master updated: [HUDI-5847] Add support for multiple metric reporters and metric labels (#8041)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 81e6e854883 [HUDI-5847] Add support for multiple metric reporters and metric labels (#8041)
81e6e854883 is described below

commit 81e6e854883a94d41ae5b7187c608a8ddbc7bf35
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Fri Mar 3 21:06:43 2023 +0530

    [HUDI-5847] Add support for multiple metric reporters and metric labels (#8041)
    
    Add support for multiple metric reporters within a MetricRegistry.
    Further it also adds labels to metrics.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../hudi/config/metrics/HoodieMetricsConfig.java   |  6 ++
 .../java/org/apache/hudi/metrics/MetricUtils.java  | 81 ++++++++++++++++++++++
 .../main/java/org/apache/hudi/metrics/Metrics.java | 54 ++++++++++++---
 .../hudi/metrics/MetricsReporterFactory.java       | 14 +++-
 .../hudi/metrics/datadog/DatadogHttpClient.java    | 20 ++++--
 .../metrics/datadog/DatadogMetricsReporter.java    |  2 +-
 .../hudi/metrics/datadog/DatadogReporter.java      | 27 +++++---
 .../prometheus/PushGatewayMetricsReporter.java     | 26 ++-----
 .../metrics/prometheus/PushGatewayReporter.java    | 42 ++++++++++-
 .../hudi/metrics/TestMetricsReporterFactory.java   |  4 +-
 .../prometheus/TestPushGateWayReporter.java        | 74 +++++++++++++++-----
 .../src/test/resources/datadog.properties          | 25 +++++++
 .../src/test/resources/prometheus.properties       | 24 +++++++
 14 files changed, 333 insertions(+), 70 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7ce7d8c6574..886112cae16 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2123,6 +2123,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
   }
 
+  public String getMetricReporterFileBasedConfigs() {
+    return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_FILE_BASED_CONFIGS_PATH);
+  }
+
   /**
    * memory configs.
    */
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 486f1277ba7..b7f3fa1f630 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -95,6 +95,12 @@ public class HoodieMetricsConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode");
 
+  public static final ConfigProperty<String> METRICS_REPORTER_FILE_BASED_CONFIGS_PATH = ConfigProperty
+      .key(METRIC_PREFIX + ".configs.properties")
+      .defaultValue("")
+      .sinceVersion("0.14.0")
+      .withDocumentation("Comma separated list of config file paths for metric exporter configs");
+
   /**
    * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
    */
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricUtils.java
new file mode 100644
index 00000000000..e119760883f
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class MetricUtils {
+
+  // Example metric:- with_label_metric;group:a,job:0. Here `with_label_metric` is the metric name.
+  // `group:a` and `job:0` are the labels for this metric.
+  // Metric name and labels are separated by `;`
+  private static final String METRIC_NAME_AND_LABELS_SEPARATOR = ";";
+  // Multiple Labels are separated by `,`
+  private static final String LABELS_SEPARATOR = ",";
+  // Label key and value is separated by `:`
+  private static final String LABELS_KEY_AND_VALUE_SEPARATOR = ":";
+
+  private static Pair<String, String> splitToPair(String label) {
+    String[] keyValues = label.split(LABELS_KEY_AND_VALUE_SEPARATOR, 2);
+    ValidationUtils.checkArgument(StringUtils.nonEmpty(keyValues[0]), String.format("Key is empty for label %s", label));
+    return  Pair.of(keyValues[0], keyValues.length == 2 ? keyValues[1] : "");
+  }
+
+  public static Pair<String,Map<String, String>> getLabelsAndMetricMap(String metric) {
+    Pair<String, List<String>> labelsList = getLabelsAndMetricList(metric);
+    return Pair.of(labelsList.getLeft(), getLabelsAsMap(labelsList.getValue()));
+  }
+
+  public static Pair<String,String> getMetricAndLabels(String metric) {
+    String[] tokens = metric.split(METRIC_NAME_AND_LABELS_SEPARATOR);
+    if (tokens.length > 2) {
+      throw new RuntimeException("more than one ';' detected in metric string");
+    }
+    if (tokens.length == 2) {
+      return  Pair.of(tokens[0], tokens[1]);
+    }
+    return Pair.of(tokens[0], "");
+  }
+
+  public static Map<String, String> getLabelsAsMap(String labels) {
+    return getLabelsAsMap(getLabels(labels));
+  }
+
+  public static Map<String, String> getLabelsAsMap(List<String> labels) {
+    return labels.stream().filter(StringUtils::nonEmpty).map(MetricUtils::splitToPair)
+        .collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (v1, v2) -> {
+          throw new IllegalStateException(String.format("Multiple values {%s, %s} for same key", v1, v2));
+        }));
+  }
+
+  public static List<String> getLabels(String labels) {
+    return Arrays.stream(labels.split(LABELS_SEPARATOR)).filter(StringUtils::nonEmpty).collect(Collectors.toList());
+  }
+
+  public static Pair<String,List<String>> getLabelsAndMetricList(String metric) {
+    Pair<String, String> metricAndLabels = getMetricAndLabels(metric);
+    return Pair.of(metricAndLabels.getLeft(), getLabels(metricAndLabels.getRight()));
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index f999f185bc8..a928ccdf870 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -18,15 +18,23 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import com.codahale.metrics.MetricRegistry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -39,18 +47,23 @@ public class Metrics {
   private static final Map<String, Metrics> METRICS_INSTANCE_PER_BASEPATH = new HashMap<>();
 
   private final MetricRegistry registry;
-  private MetricsReporter reporter;
+  private final List<MetricsReporter> reporters;
   private final String commonMetricPrefix;
   private boolean initialized = false;
 
   public Metrics(HoodieWriteConfig metricConfig) {
     registry = new MetricRegistry();
     commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix();
-    reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
-    if (reporter == null) {
-      throw new RuntimeException("Cannot initialize Reporter.");
+    reporters = new ArrayList<>();
+    Option<MetricsReporter> defaultReporter = MetricsReporterFactory.createReporter(metricConfig, registry);
+    defaultReporter.ifPresent(reporters::add);
+    if (StringUtils.nonEmpty(metricConfig.getMetricReporterFileBasedConfigs())) {
+      reporters.addAll(addAdditionalMetricsExporters(metricConfig));
     }
-    reporter.start();
+    if (reporters.size() == 0) {
+      throw new RuntimeException("Cannot initialize Reporters.");
+    }
+    reporters.forEach(MetricsReporter::start);
 
     Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
     this.initialized = true;
@@ -75,12 +88,34 @@ public class Metrics {
     METRICS_INSTANCE_PER_BASEPATH.values().forEach(Metrics::shutdown);
   }
 
+  private List<MetricsReporter> addAdditionalMetricsExporters(HoodieWriteConfig metricConfig) {
+    List<MetricsReporter> reporterList = new ArrayList<>();
+    List<String> propPathList = StringUtils.split(metricConfig.getMetricReporterFileBasedConfigs(), ",");
+    try (FileSystem fs = FSUtils.getFs(propPathList.get(0), new Configuration())) {
+      for (String propPath : propPathList) {
+        HoodieWriteConfig secondarySourceConfig = HoodieWriteConfig.newBuilder().fromInputStream(
+            fs.open(new Path(propPath))).withPath(metricConfig.getBasePath()).build();
+        Option<MetricsReporter> reporter = MetricsReporterFactory.createReporter(secondarySourceConfig, registry);
+        if (reporter.isPresent()) {
+          reporterList.add(reporter.get());
+        } else {
+          LOG.error(String.format("Could not create reporter using properties path %s base path %s",
+              propPath, metricConfig.getBasePath()));
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to add MetricsExporters", e);
+    }
+    LOG.info("total additional metrics reporters added =" + reporterList.size());
+    return reporterList;
+  }
+
   public synchronized void shutdown() {
     try {
       registerHoodieCommonMetrics();
-      reporter.report();
+      reporters.forEach(MetricsReporter::report);
       LOG.info("Stopping the metrics reporter...");
-      reporter.stop();
+      reporters.forEach(MetricsReporter::stop);
     } catch (Exception e) {
       LOG.warn("Error while closing reporter", e);
     } finally {
@@ -92,13 +127,14 @@ public class Metrics {
     try {
       LOG.info("Reporting and flushing all metrics");
       registerHoodieCommonMetrics();
-      reporter.report();
+      reporters.forEach(MetricsReporter::report);
       registry.getNames().forEach(this.registry::remove);
+      registerHoodieCommonMetrics();
     } catch (Exception e) {
       LOG.error("Error while reporting and flushing metrics", e);
     }
   }
-  
+
   public void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
     String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
     metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index d81e337b28d..405968eefb3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter;
 import org.apache.hudi.metrics.custom.CustomizableMetricsReporter;
@@ -41,7 +43,7 @@ public class MetricsReporterFactory {
 
   private static final Logger LOG = LogManager.getLogger(MetricsReporterFactory.class);
 
-  public static MetricsReporter createReporter(HoodieWriteConfig config, MetricRegistry registry) {
+  public static Option<MetricsReporter> createReporter(HoodieWriteConfig config, MetricRegistry registry) {
     String reporterClassName = config.getMetricReporterClassName();
 
     if (!StringUtils.isNullOrEmpty(reporterClassName)) {
@@ -51,11 +53,17 @@ public class MetricsReporterFactory {
         throw new HoodieException(config.getMetricReporterClassName()
             + " is not a subclass of CustomizableMetricsReporter");
       }
-      return (MetricsReporter) instance;
+      return Option.of((MetricsReporter) instance);
     }
 
     MetricsReporterType type = config.getMetricsReporterType();
     MetricsReporter reporter = null;
+    if (type == null) {
+      LOG.warn(String.format("Metric creation failed. %s is not configured",
+          HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key()));
+      return Option.empty();
+    }
+
     switch (type) {
       case GRAPHITE:
         reporter = new MetricsGraphiteReporter(config, registry);
@@ -85,6 +93,6 @@ public class MetricsReporterFactory {
         LOG.error("Reporter type[" + type + "] is not supported.");
         break;
     }
-    return reporter;
+    return Option.ofNullable(reporter);
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
index b0912aaaabf..5948572c4f4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metrics.datadog;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 
@@ -47,8 +48,9 @@ public class DatadogHttpClient implements Closeable {
 
   private static final Logger LOG = LogManager.getLogger(DatadogHttpClient.class);
 
-  private static final String SERIES_URL_FORMAT = "https://app.datadoghq.%s/api/v1/series";
-  private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.%s/api/v1/validate";
+  private static final String DEFAULT_HOST = "app.us.datadoghq";
+  private static final String SERIES_URL_FORMAT = "https://%s.%s/api/v1/series";
+  private static final String VALIDATE_URL_FORMAT = "https://%s.%s/api/v1/validate";
   private static final String HEADER_KEY_API_KEY = "DD-API-KEY";
 
   private final String apiKey;
@@ -56,23 +58,27 @@ public class DatadogHttpClient implements Closeable {
   private final String validateUrl;
   private final CloseableHttpClient client;
 
-  public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client) {
+  public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client, Option<String> host) {
     this.apiKey = apiKey;
-    this.seriesUrl = String.format(SERIES_URL_FORMAT, apiSite.getDomain());
-    this.validateUrl = String.format(VALIDATE_URL_FORMAT, apiSite.getDomain());
+    this.seriesUrl = String.format(SERIES_URL_FORMAT, host.orElse(DEFAULT_HOST), apiSite.getDomain());
+    this.validateUrl = String.format(VALIDATE_URL_FORMAT, host.orElse(DEFAULT_HOST), apiSite.getDomain());
     this.client = client;
     if (!skipValidation) {
       validateApiKey();
     }
   }
 
-  public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, int timeoutSeconds) {
+  public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client) {
+    this(apiSite, apiKey, skipValidation, client,  Option.of(DEFAULT_HOST));
+  }
+
+  public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, int timeoutSeconds, Option<String> host) {
     this(apiSite, apiKey, skipValidation, HttpClientBuilder.create()
         .setDefaultRequestConfig(RequestConfig.custom()
             .setConnectTimeout(timeoutSeconds * 1000)
             .setConnectionRequestTimeout(timeoutSeconds * 1000)
             .setSocketTimeout(timeoutSeconds * 1000).build())
-        .build());
+        .build(), host);
   }
 
   private void validateApiKey() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
index fdbd0cc7e40..3f598f34a2d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
@@ -60,7 +60,7 @@ public class DatadogMetricsReporter extends MetricsReporter {
 
     reporter = new DatadogReporter(
         registry,
-        new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds),
+        new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds, host),
         prefix,
         host,
         tags,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
index a388aecda0a..20c0f1b82a7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
@@ -20,6 +20,8 @@ package org.apache.hudi.metrics.datadog;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.MetricUtils;
 
 import com.codahale.metrics.Clock;
 import com.codahale.metrics.Counter;
@@ -39,6 +41,7 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -84,18 +87,22 @@ public class DatadogReporter extends ScheduledReporter {
       SortedMap<String, Histogram> histograms,
       SortedMap<String, Meter> meters,
       SortedMap<String, Timer> timers) {
-    final long now = clock.getTime() / 1000;
-    final PayloadBuilder builder = new PayloadBuilder();
 
-    builder.withMetricType(MetricType.gauge);
-    gauges.forEach((metricName, metric) -> {
-      builder.addGauge(prefix(metricName), now, (long) metric.getValue());
+    Map<List<String>, List<Pair<String, List<String>>>> labelsPair = gauges.keySet().stream().map(MetricUtils::getLabelsAndMetricList)
+        .collect(Collectors.groupingBy(Pair::getValue));
+    labelsPair.entrySet().forEach(labelsKeyValue -> {
+      final long now = clock.getTime() / 1000;
+      final PayloadBuilder builder = new PayloadBuilder();
+      builder.withMetricType(MetricType.gauge);
+      gauges.forEach(
+          (metricName, metric) -> builder.addGauge(prefix(MetricUtils.getMetricAndLabels(metricName).getKey()),
+              now, (long) metric.getValue()));
+      host.ifPresent(builder::withHost);
+      List<String> runTimeLables = labelsKeyValue.getKey();
+      tags.map(runTimeLables::addAll);
+      builder.withTags(runTimeLables);
+      client.send(builder.build());
     });
-
-    host.ifPresent(builder::withHost);
-    tags.ifPresent(builder::withTags);
-
-    client.send(builder.build());
   }
 
   protected String prefix(String... components) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java
index 02654cd17cf..805e5d7c0d7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java
@@ -18,20 +18,18 @@
 
 package org.apache.hudi.metrics.prometheus;
 
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.MetricUtils;
 import org.apache.hudi.metrics.MetricsReporter;
 
 import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
 
 public class PushGatewayMetricsReporter extends MetricsReporter {
 
@@ -92,20 +90,10 @@ public class PushGatewayMetricsReporter extends MetricsReporter {
   }
 
   private static Map<String, String> parseLabels(String labels) {
-    Stream<String[]> intermediateStream = Pattern.compile("\\s*,\\s*")
-        .splitAsStream(labels.trim())
-        .map(s -> s.split(":", 2));
-
-    Map<String, String> labelsMap = new HashMap<>();
-    intermediateStream.forEach(a -> {
-      String key = a[0];
-      String value = a.length > 1 ? a[1] : "";
-      String oldValue = labelsMap.put(key, value);
-      if (oldValue != null) {
-        throw new HoodieException(String.format("Duplicate key=%s found in labels: %s %s",
-            key, key + ":" + oldValue, key + ":" + value));
-      }
-    });
-    return labelsMap;
+    if (StringUtils.isNullOrEmpty(labels)) {
+      return Collections.emptyMap();
+    }
+
+    return MetricUtils.getLabelsAsMap(labels);
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
index dae10e93ca2..991a966c615 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.metrics.prometheus;
 
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.MetricUtils;
+
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Counter;
@@ -29,12 +32,16 @@ import com.codahale.metrics.ScheduledReporter;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.dropwizard.DropwizardExports;
 import io.prometheus.client.exporter.PushGateway;
+
 import java.net.MalformedURLException;
 import java.net.URL;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +59,8 @@ public class PushGatewayReporter extends ScheduledReporter {
   private final String jobName;
   private final Map<String, String> labels;
   private final boolean deleteShutdown;
+  private final HashMap<String, io.prometheus.client.Gauge> gaugeHashMap;
+  private final MetricRegistry registry;
 
   protected PushGatewayReporter(MetricRegistry registry,
                                 MetricFilter filter,
@@ -66,10 +75,13 @@ public class PushGatewayReporter extends ScheduledReporter {
     this.jobName = jobName;
     this.labels = labels;
     this.deleteShutdown = deleteShutdown;
+    this.registry = registry;
     collectorRegistry = new CollectorRegistry();
     metricExports = new DropwizardExports(registry);
     pushGatewayClient = createPushGatewayClient(serverHost, serverPort);
     metricExports.register(collectorRegistry);
+    gaugeHashMap = new HashMap<>();
+
   }
 
   private synchronized PushGateway createPushGatewayClient(String serverHost, int serverPort) {
@@ -100,6 +112,7 @@ public class PushGatewayReporter extends ScheduledReporter {
                      SortedMap<String, Meter> meters,
                      SortedMap<String, Timer> timers) {
     try {
+      handleLabeledMetrics();
       pushGatewayClient.pushAdd(collectorRegistry, jobName, labels);
     } catch (IOException e) {
       LOG.warn("Can't push monitoring information to pushGateway", e);
@@ -117,10 +130,37 @@ public class PushGatewayReporter extends ScheduledReporter {
     try {
       if (deleteShutdown) {
         collectorRegistry.unregister(metricExports);
-        pushGatewayClient.delete(jobName);
+        pushGatewayClient.delete(jobName, labels);
+        for (String key : gaugeHashMap.keySet()) {
+          Pair<String, Map<String, String>> mapPair = MetricUtils.getLabelsAndMetricMap(key);
+          pushGatewayClient.delete(mapPair.getKey(), mapPair.getValue());
+        }
       }
     } catch (IOException e) {
       LOG.warn("Failed to delete metrics from pushGateway with jobName {" + jobName + "}", e);
     }
   }
+
+  private void handleLabeledMetrics() {
+    registry.getGauges().entrySet().forEach(gaugeEntry -> {
+      String key = gaugeEntry.getKey();
+      Pair<String, Map<String,String>> stringMapPair = MetricUtils.getLabelsAndMetricMap(key);
+      if (stringMapPair.getValue().size() > 0) {
+        List<String> labelNames = new ArrayList<>();
+        List<String> labelValues = new ArrayList<>();
+        for (Map.Entry et : stringMapPair.getValue().entrySet()) {
+          labelNames.add((String) et.getKey());
+          labelValues.add((String) et.getValue());
+        }
+        if (!gaugeHashMap.containsKey(key)) {
+          gaugeHashMap.put(key, io.prometheus.client.Gauge.build().help("labeled metricName:" + stringMapPair.getKey())
+              .name(stringMapPair.getKey())
+              .labelNames(labelNames.toArray(new String[0])).register(collectorRegistry));
+        }
+        gaugeHashMap.get(key)
+            .labels(labelValues.toArray(new String[0]))
+            .set((Long) gaugeEntry.getValue().getValue());
+      }
+    });
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java
index 390f585ebb7..a44443d9bd5 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java
@@ -49,7 +49,7 @@ public class TestMetricsReporterFactory {
   @Test
   public void metricsReporterFactoryShouldReturnReporter() {
     when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
-    MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry);
+    MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry).get();
     assertTrue(reporter instanceof InMemoryMetricsReporter);
   }
 
@@ -61,7 +61,7 @@ public class TestMetricsReporterFactory {
     props.setProperty("testKey", "testValue");
 
     when(config.getProps()).thenReturn(props);
-    MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry);
+    MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry).get();
     assertTrue(reporter instanceof CustomizableMetricsReporter);
     assertEquals(props, ((DummyMetricsReporter) reporter).getProps());
     assertEquals(registry, ((DummyMetricsReporter) reporter).getRegistry());
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
index 245a66d83d3..27f7c5a8345 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
@@ -18,21 +18,30 @@
 
 package org.apache.hudi.metrics.prometheus;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.metrics.MetricUtils;
 import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.metrics.MetricsReporterType;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.util.ArrayList;
 import java.util.Map;
-
 import java.util.UUID;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,8 +52,12 @@ import static org.mockito.Mockito.when;
 @ExtendWith(MockitoExtension.class)
 public class TestPushGateWayReporter {
 
+  static final URL PROP_FILE_PROMETHEUS_URL = TestPushGateWayReporter.class.getClassLoader().getResource("prometheus.properties");
+  static final URL PROP_FILE_DATADOG_URL = TestPushGateWayReporter.class.getClassLoader().getResource("datadog.properties");
+
   @Mock
   HoodieWriteConfig config;
+
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -58,21 +71,10 @@ public class TestPushGateWayReporter {
   @Test
   public void testRegisterGauge() {
     when(config.isMetricsOn()).thenReturn(true);
-    when(config.getTableName()).thenReturn("foo");
-    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS_PUSHGATEWAY);
-    when(config.getPushGatewayHost()).thenReturn("localhost");
-    when(config.getPushGatewayPort()).thenReturn(9091);
-    when(config.getPushGatewayReportPeriodSeconds()).thenReturn(30);
-    when(config.getPushGatewayDeleteOnShutdown()).thenReturn(true);
-    when(config.getPushGatewayJobName()).thenReturn("foo");
-    when(config.getPushGatewayRandomJobNameSuffix()).thenReturn(false);
-    when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus");
-    when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
-    hoodieMetrics = new HoodieMetrics(config);
-    metrics = hoodieMetrics.getMetrics();
 
     assertDoesNotThrow(() -> {
-      new HoodieMetrics(config);
+      hoodieMetrics = new HoodieMetrics(config);
+      metrics = hoodieMetrics.getMetrics();
     });
 
     metrics.registerGauge("pushGateWayReporter_metric", 123L);
@@ -80,6 +82,42 @@ public class TestPushGateWayReporter {
         .get("pushGateWayReporter_metric").getValue().toString());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testMultiReporter(boolean addDefaultReporter) throws IOException, InterruptedException, URISyntaxException {
+
+    String propPrometheusPath = Objects.requireNonNull(PROP_FILE_PROMETHEUS_URL).toURI().getPath();
+    String propDatadogPath = Objects.requireNonNull(PROP_FILE_DATADOG_URL).toURI().getPath();
+    if (addDefaultReporter) {
+      when(config.isMetricsOn()).thenReturn(true);
+      when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS_PUSHGATEWAY);
+      when(config.getPushGatewayReportPeriodSeconds()).thenReturn(30);
+    } else {
+      when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+      when(config.getMetricReporterMetricsNamePrefix()).thenReturn(TestPushGateWayReporter.class.getSimpleName());
+      when(config.getMetricReporterFileBasedConfigs()).thenReturn(propPrometheusPath + "," + propDatadogPath);
+      when(config.isMetricsOn()).thenReturn(true);
+    }
+
+    hoodieMetrics = new HoodieMetrics(config);
+    metrics = hoodieMetrics.getMetrics();
+
+    Map<String, Long> metricsMap = new HashMap<>();
+    Map<String, Long> labellessMetricMap = new HashMap<>();
+    Map<String, String> labels = new HashMap<>();
+    labels.put("group", "a");
+    labels.put("job", "0");
+    metricsMap.put("with_label_metric;group:a,job:0", 1L);
+    labellessMetricMap.put("without_label_metric", 1L);
+    metrics.registerGauges(metricsMap, Option.empty());
+    metrics.registerGauges(labellessMetricMap, Option.empty());
+    List<String> metricKeys = new ArrayList<>(metrics.getRegistry().getGauges().keySet());
+    assertEquals(0, MetricUtils.getLabelsAndMetricMap(metricKeys.stream()
+        .filter(x -> x.contains("without_label_metric")).findFirst().get()).getValue().size());
+    assertEquals(labels, MetricUtils.getLabelsAndMetricMap(metricKeys.stream()
+        .filter(x -> x.contains("with_label_metric")).findFirst().get()).getValue());
+  }
+
   @Test
   public void testMetricLabels() {
     PushGatewayMetricsReporter reporter;
@@ -115,11 +153,11 @@ public class TestPushGateWayReporter {
     assertTrue(labels.containsValue("prometheus"));
 
     try {
-      when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus,hudi:prometheus");
+      when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus,hudi:prom");
       reporter = new PushGatewayMetricsReporter(config, null);
       fail("Should fail");
-    } catch (HoodieException e) {
-      assertTrue(e.getMessage().contains("Duplicate key=hudi found in labels"));
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Multiple values {prometheus, prom} for same key"));
     }
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/resources/datadog.properties b/hudi-client/hudi-client-common/src/test/resources/datadog.properties
new file mode 100644
index 00000000000..bef794612c9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/resources/datadog.properties
@@ -0,0 +1,25 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.base.path="/tmp/base_path"
+hoodie.metrics.datadog.api.key=f4e1ca9f6e3f16ff5c0800dddfda6179
+hoodie.metrics.datadog.report.period.seconds=3600
+hoodie.metrics.datadog.job.name=test_job
+hoodie.metrics.datadog.metric.host=app.us5.datadoghq
+hoodie.metrics.datadog.metric.prefix=oh
+hoodie.metrics.datadog.api.site=US
+hoodie.metrics.reporter.type=DATADOG
diff --git a/hudi-client/hudi-client-common/src/test/resources/prometheus.properties b/hudi-client/hudi-client-common/src/test/resources/prometheus.properties
new file mode 100644
index 00000000000..3d5d6272fa1
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/resources/prometheus.properties
@@ -0,0 +1,24 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.base.path="/tmp/base_path"
+hoodie.metrics.pushgateway.random.job.name.suffix=false
+hoodie.metrics.reporter.class=
+hoodie.metrics.pushgateway.host=localhost
+hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY
+hoodie.metrics.pushgateway.job.name=onehouse_dataplane
+hoodie.metrics.pushgateway.delete.on.shutdown=false