You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/03 15:36:51 UTC
[hudi] branch master updated: [HUDI-5847] Add support for multiple metric reporters and metric labels (#8041)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 81e6e854883 [HUDI-5847] Add support for multiple metric reporters and metric labels (#8041)
81e6e854883 is described below
commit 81e6e854883a94d41ae5b7187c608a8ddbc7bf35
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Fri Mar 3 21:06:43 2023 +0530
[HUDI-5847] Add support for multiple metric reporters and metric labels (#8041)
Add support for multiple metric reporters within a MetricRegistry.
Further it also adds labels to metrics.
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++
.../hudi/config/metrics/HoodieMetricsConfig.java | 6 ++
.../java/org/apache/hudi/metrics/MetricUtils.java | 81 ++++++++++++++++++++++
.../main/java/org/apache/hudi/metrics/Metrics.java | 54 ++++++++++++---
.../hudi/metrics/MetricsReporterFactory.java | 14 +++-
.../hudi/metrics/datadog/DatadogHttpClient.java | 20 ++++--
.../metrics/datadog/DatadogMetricsReporter.java | 2 +-
.../hudi/metrics/datadog/DatadogReporter.java | 27 +++++---
.../prometheus/PushGatewayMetricsReporter.java | 26 ++-----
.../metrics/prometheus/PushGatewayReporter.java | 42 ++++++++++-
.../hudi/metrics/TestMetricsReporterFactory.java | 4 +-
.../prometheus/TestPushGateWayReporter.java | 74 +++++++++++++++-----
.../src/test/resources/datadog.properties | 25 +++++++
.../src/test/resources/prometheus.properties | 24 +++++++
14 files changed, 333 insertions(+), 70 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7ce7d8c6574..886112cae16 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2123,6 +2123,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
}
+ public String getMetricReporterFileBasedConfigs() {
+ return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_FILE_BASED_CONFIGS_PATH);
+ }
+
/**
* memory configs.
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 486f1277ba7..b7f3fa1f630 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -95,6 +95,12 @@ public class HoodieMetricsConfig extends HoodieConfig {
.sinceVersion("0.13.0")
.withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode");
+ public static final ConfigProperty<String> METRICS_REPORTER_FILE_BASED_CONFIGS_PATH = ConfigProperty
+ .key(METRIC_PREFIX + ".configs.properties")
+ .defaultValue("")
+ .sinceVersion("0.14.0")
+ .withDocumentation("Comma separated list of config file paths for metric exporter configs");
+
/**
* @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricUtils.java
new file mode 100644
index 00000000000..e119760883f
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class MetricUtils {
+
+ // Example metric:- with_label_metric;group:a,job:0. Here `with_label_metric` is the metric name.
+ // `group:a` and `job:0` are the labels for this metric.
+ // Metric name and labels are separated by `;`
+ private static final String METRIC_NAME_AND_LABELS_SEPARATOR = ";";
+ // Multiple Labels are separated by `,`
+ private static final String LABELS_SEPARATOR = ",";
+ // Label key and value is separated by `:`
+ private static final String LABELS_KEY_AND_VALUE_SEPARATOR = ":";
+
+ private static Pair<String, String> splitToPair(String label) {
+ String[] keyValues = label.split(LABELS_KEY_AND_VALUE_SEPARATOR, 2);
+ ValidationUtils.checkArgument(StringUtils.nonEmpty(keyValues[0]), String.format("Key is empty for label %s", label));
+ return Pair.of(keyValues[0], keyValues.length == 2 ? keyValues[1] : "");
+ }
+
+ public static Pair<String,Map<String, String>> getLabelsAndMetricMap(String metric) {
+ Pair<String, List<String>> labelsList = getLabelsAndMetricList(metric);
+ return Pair.of(labelsList.getLeft(), getLabelsAsMap(labelsList.getValue()));
+ }
+
+ public static Pair<String,String> getMetricAndLabels(String metric) {
+ String[] tokens = metric.split(METRIC_NAME_AND_LABELS_SEPARATOR);
+ if (tokens.length > 2) {
+ throw new RuntimeException("more than one ';' detected in metric string");
+ }
+ if (tokens.length == 2) {
+ return Pair.of(tokens[0], tokens[1]);
+ }
+ return Pair.of(tokens[0], "");
+ }
+
+ public static Map<String, String> getLabelsAsMap(String labels) {
+ return getLabelsAsMap(getLabels(labels));
+ }
+
+ public static Map<String, String> getLabelsAsMap(List<String> labels) {
+ return labels.stream().filter(StringUtils::nonEmpty).map(MetricUtils::splitToPair)
+ .collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (v1, v2) -> {
+ throw new IllegalStateException(String.format("Multiple values {%s, %s} for same key", v1, v2));
+ }));
+ }
+
+ public static List<String> getLabels(String labels) {
+ return Arrays.stream(labels.split(LABELS_SEPARATOR)).filter(StringUtils::nonEmpty).collect(Collectors.toList());
+ }
+
+ public static Pair<String,List<String>> getLabelsAndMetricList(String metric) {
+ Pair<String, String> metricAndLabels = getMetricAndLabels(metric);
+ return Pair.of(metricAndLabels.getLeft(), getLabels(metricAndLabels.getRight()));
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index f999f185bc8..a928ccdf870 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -18,15 +18,23 @@
package org.apache.hudi.metrics;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import com.codahale.metrics.MetricRegistry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -39,18 +47,23 @@ public class Metrics {
private static final Map<String, Metrics> METRICS_INSTANCE_PER_BASEPATH = new HashMap<>();
private final MetricRegistry registry;
- private MetricsReporter reporter;
+ private final List<MetricsReporter> reporters;
private final String commonMetricPrefix;
private boolean initialized = false;
public Metrics(HoodieWriteConfig metricConfig) {
registry = new MetricRegistry();
commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix();
- reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
- if (reporter == null) {
- throw new RuntimeException("Cannot initialize Reporter.");
+ reporters = new ArrayList<>();
+ Option<MetricsReporter> defaultReporter = MetricsReporterFactory.createReporter(metricConfig, registry);
+ defaultReporter.ifPresent(reporters::add);
+ if (StringUtils.nonEmpty(metricConfig.getMetricReporterFileBasedConfigs())) {
+ reporters.addAll(addAdditionalMetricsExporters(metricConfig));
}
- reporter.start();
+ if (reporters.size() == 0) {
+ throw new RuntimeException("Cannot initialize Reporters.");
+ }
+ reporters.forEach(MetricsReporter::start);
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
this.initialized = true;
@@ -75,12 +88,34 @@ public class Metrics {
METRICS_INSTANCE_PER_BASEPATH.values().forEach(Metrics::shutdown);
}
+ private List<MetricsReporter> addAdditionalMetricsExporters(HoodieWriteConfig metricConfig) {
+ List<MetricsReporter> reporterList = new ArrayList<>();
+ List<String> propPathList = StringUtils.split(metricConfig.getMetricReporterFileBasedConfigs(), ",");
+ try (FileSystem fs = FSUtils.getFs(propPathList.get(0), new Configuration())) {
+ for (String propPath : propPathList) {
+ HoodieWriteConfig secondarySourceConfig = HoodieWriteConfig.newBuilder().fromInputStream(
+ fs.open(new Path(propPath))).withPath(metricConfig.getBasePath()).build();
+ Option<MetricsReporter> reporter = MetricsReporterFactory.createReporter(secondarySourceConfig, registry);
+ if (reporter.isPresent()) {
+ reporterList.add(reporter.get());
+ } else {
+ LOG.error(String.format("Could not create reporter using properties path %s base path %s",
+ propPath, metricConfig.getBasePath()));
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to add MetricsExporters", e);
+ }
+ LOG.info("total additional metrics reporters added =" + reporterList.size());
+ return reporterList;
+ }
+
public synchronized void shutdown() {
try {
registerHoodieCommonMetrics();
- reporter.report();
+ reporters.forEach(MetricsReporter::report);
LOG.info("Stopping the metrics reporter...");
- reporter.stop();
+ reporters.forEach(MetricsReporter::stop);
} catch (Exception e) {
LOG.warn("Error while closing reporter", e);
} finally {
@@ -92,13 +127,14 @@ public class Metrics {
try {
LOG.info("Reporting and flushing all metrics");
registerHoodieCommonMetrics();
- reporter.report();
+ reporters.forEach(MetricsReporter::report);
registry.getNames().forEach(this.registry::remove);
+ registerHoodieCommonMetrics();
} catch (Exception e) {
LOG.error("Error while reporting and flushing metrics", e);
}
}
-
+
public void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index d81e337b28d..405968eefb3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -18,9 +18,11 @@
package org.apache.hudi.metrics;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter;
import org.apache.hudi.metrics.custom.CustomizableMetricsReporter;
@@ -41,7 +43,7 @@ public class MetricsReporterFactory {
private static final Logger LOG = LogManager.getLogger(MetricsReporterFactory.class);
- public static MetricsReporter createReporter(HoodieWriteConfig config, MetricRegistry registry) {
+ public static Option<MetricsReporter> createReporter(HoodieWriteConfig config, MetricRegistry registry) {
String reporterClassName = config.getMetricReporterClassName();
if (!StringUtils.isNullOrEmpty(reporterClassName)) {
@@ -51,11 +53,17 @@ public class MetricsReporterFactory {
throw new HoodieException(config.getMetricReporterClassName()
+ " is not a subclass of CustomizableMetricsReporter");
}
- return (MetricsReporter) instance;
+ return Option.of((MetricsReporter) instance);
}
MetricsReporterType type = config.getMetricsReporterType();
MetricsReporter reporter = null;
+ if (type == null) {
+ LOG.warn(String.format("Metric creation failed. %s is not configured",
+ HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key()));
+ return Option.empty();
+ }
+
switch (type) {
case GRAPHITE:
reporter = new MetricsGraphiteReporter(config, registry);
@@ -85,6 +93,6 @@ public class MetricsReporterFactory {
LOG.error("Reporter type[" + type + "] is not supported.");
break;
}
- return reporter;
+ return Option.ofNullable(reporter);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
index b0912aaaabf..5948572c4f4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.metrics.datadog;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -47,8 +48,9 @@ public class DatadogHttpClient implements Closeable {
private static final Logger LOG = LogManager.getLogger(DatadogHttpClient.class);
- private static final String SERIES_URL_FORMAT = "https://app.datadoghq.%s/api/v1/series";
- private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.%s/api/v1/validate";
+ private static final String DEFAULT_HOST = "app.us.datadoghq";
+ private static final String SERIES_URL_FORMAT = "https://%s.%s/api/v1/series";
+ private static final String VALIDATE_URL_FORMAT = "https://%s.%s/api/v1/validate";
private static final String HEADER_KEY_API_KEY = "DD-API-KEY";
private final String apiKey;
@@ -56,23 +58,27 @@ public class DatadogHttpClient implements Closeable {
private final String validateUrl;
private final CloseableHttpClient client;
- public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client) {
+ public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client, Option<String> host) {
this.apiKey = apiKey;
- this.seriesUrl = String.format(SERIES_URL_FORMAT, apiSite.getDomain());
- this.validateUrl = String.format(VALIDATE_URL_FORMAT, apiSite.getDomain());
+ this.seriesUrl = String.format(SERIES_URL_FORMAT, host.orElse(DEFAULT_HOST), apiSite.getDomain());
+ this.validateUrl = String.format(VALIDATE_URL_FORMAT, host.orElse(DEFAULT_HOST), apiSite.getDomain());
this.client = client;
if (!skipValidation) {
validateApiKey();
}
}
- public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, int timeoutSeconds) {
+ public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client) {
+ this(apiSite, apiKey, skipValidation, client, Option.of(DEFAULT_HOST));
+ }
+
+ public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, int timeoutSeconds, Option<String> host) {
this(apiSite, apiKey, skipValidation, HttpClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(timeoutSeconds * 1000)
.setConnectionRequestTimeout(timeoutSeconds * 1000)
.setSocketTimeout(timeoutSeconds * 1000).build())
- .build());
+ .build(), host);
}
private void validateApiKey() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
index fdbd0cc7e40..3f598f34a2d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
@@ -60,7 +60,7 @@ public class DatadogMetricsReporter extends MetricsReporter {
reporter = new DatadogReporter(
registry,
- new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds),
+ new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds, host),
prefix,
host,
tags,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
index a388aecda0a..20c0f1b82a7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
@@ -20,6 +20,8 @@ package org.apache.hudi.metrics.datadog;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.MetricUtils;
import com.codahale.metrics.Clock;
import com.codahale.metrics.Counter;
@@ -39,6 +41,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -84,18 +87,22 @@ public class DatadogReporter extends ScheduledReporter {
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
- final long now = clock.getTime() / 1000;
- final PayloadBuilder builder = new PayloadBuilder();
- builder.withMetricType(MetricType.gauge);
- gauges.forEach((metricName, metric) -> {
- builder.addGauge(prefix(metricName), now, (long) metric.getValue());
+ Map<List<String>, List<Pair<String, List<String>>>> labelsPair = gauges.keySet().stream().map(MetricUtils::getLabelsAndMetricList)
+ .collect(Collectors.groupingBy(Pair::getValue));
+ labelsPair.entrySet().forEach(labelsKeyValue -> {
+ final long now = clock.getTime() / 1000;
+ final PayloadBuilder builder = new PayloadBuilder();
+ builder.withMetricType(MetricType.gauge);
+ gauges.forEach(
+ (metricName, metric) -> builder.addGauge(prefix(MetricUtils.getMetricAndLabels(metricName).getKey()),
+ now, (long) metric.getValue()));
+ host.ifPresent(builder::withHost);
+ List<String> runTimeLables = labelsKeyValue.getKey();
+ tags.map(runTimeLables::addAll);
+ builder.withTags(runTimeLables);
+ client.send(builder.build());
});
-
- host.ifPresent(builder::withHost);
- tags.ifPresent(builder::withTags);
-
- client.send(builder.build());
}
protected String prefix(String... components) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java
index 02654cd17cf..805e5d7c0d7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayMetricsReporter.java
@@ -18,20 +18,18 @@
package org.apache.hudi.metrics.prometheus;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.MetricUtils;
import org.apache.hudi.metrics.MetricsReporter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
public class PushGatewayMetricsReporter extends MetricsReporter {
@@ -92,20 +90,10 @@ public class PushGatewayMetricsReporter extends MetricsReporter {
}
private static Map<String, String> parseLabels(String labels) {
- Stream<String[]> intermediateStream = Pattern.compile("\\s*,\\s*")
- .splitAsStream(labels.trim())
- .map(s -> s.split(":", 2));
-
- Map<String, String> labelsMap = new HashMap<>();
- intermediateStream.forEach(a -> {
- String key = a[0];
- String value = a.length > 1 ? a[1] : "";
- String oldValue = labelsMap.put(key, value);
- if (oldValue != null) {
- throw new HoodieException(String.format("Duplicate key=%s found in labels: %s %s",
- key, key + ":" + oldValue, key + ":" + value));
- }
- });
- return labelsMap;
+ if (StringUtils.isNullOrEmpty(labels)) {
+ return Collections.emptyMap();
+ }
+
+ return MetricUtils.getLabelsAsMap(labels);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
index dae10e93ca2..991a966c615 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
@@ -18,6 +18,9 @@
package org.apache.hudi.metrics.prometheus;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.MetricUtils;
+
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Counter;
@@ -29,12 +32,16 @@ import com.codahale.metrics.ScheduledReporter;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.PushGateway;
+
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +59,8 @@ public class PushGatewayReporter extends ScheduledReporter {
private final String jobName;
private final Map<String, String> labels;
private final boolean deleteShutdown;
+ private final HashMap<String, io.prometheus.client.Gauge> gaugeHashMap;
+ private final MetricRegistry registry;
protected PushGatewayReporter(MetricRegistry registry,
MetricFilter filter,
@@ -66,10 +75,13 @@ public class PushGatewayReporter extends ScheduledReporter {
this.jobName = jobName;
this.labels = labels;
this.deleteShutdown = deleteShutdown;
+ this.registry = registry;
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
pushGatewayClient = createPushGatewayClient(serverHost, serverPort);
metricExports.register(collectorRegistry);
+ gaugeHashMap = new HashMap<>();
+
}
private synchronized PushGateway createPushGatewayClient(String serverHost, int serverPort) {
@@ -100,6 +112,7 @@ public class PushGatewayReporter extends ScheduledReporter {
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
try {
+ handleLabeledMetrics();
pushGatewayClient.pushAdd(collectorRegistry, jobName, labels);
} catch (IOException e) {
LOG.warn("Can't push monitoring information to pushGateway", e);
@@ -117,10 +130,37 @@ public class PushGatewayReporter extends ScheduledReporter {
try {
if (deleteShutdown) {
collectorRegistry.unregister(metricExports);
- pushGatewayClient.delete(jobName);
+ pushGatewayClient.delete(jobName, labels);
+ for (String key : gaugeHashMap.keySet()) {
+ Pair<String, Map<String, String>> mapPair = MetricUtils.getLabelsAndMetricMap(key);
+ pushGatewayClient.delete(mapPair.getKey(), mapPair.getValue());
+ }
}
} catch (IOException e) {
LOG.warn("Failed to delete metrics from pushGateway with jobName {" + jobName + "}", e);
}
}
+
+ private void handleLabeledMetrics() {
+ registry.getGauges().entrySet().forEach(gaugeEntry -> {
+ String key = gaugeEntry.getKey();
+ Pair<String, Map<String,String>> stringMapPair = MetricUtils.getLabelsAndMetricMap(key);
+ if (stringMapPair.getValue().size() > 0) {
+ List<String> labelNames = new ArrayList<>();
+ List<String> labelValues = new ArrayList<>();
+ for (Map.Entry et : stringMapPair.getValue().entrySet()) {
+ labelNames.add((String) et.getKey());
+ labelValues.add((String) et.getValue());
+ }
+ if (!gaugeHashMap.containsKey(key)) {
+ gaugeHashMap.put(key, io.prometheus.client.Gauge.build().help("labeled metricName:" + stringMapPair.getKey())
+ .name(stringMapPair.getKey())
+ .labelNames(labelNames.toArray(new String[0])).register(collectorRegistry));
+ }
+ gaugeHashMap.get(key)
+ .labels(labelValues.toArray(new String[0]))
+ .set((Long) gaugeEntry.getValue().getValue());
+ }
+ });
+ }
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java
index 390f585ebb7..a44443d9bd5 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java
@@ -49,7 +49,7 @@ public class TestMetricsReporterFactory {
@Test
public void metricsReporterFactoryShouldReturnReporter() {
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
- MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry);
+ MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry).get();
assertTrue(reporter instanceof InMemoryMetricsReporter);
}
@@ -61,7 +61,7 @@ public class TestMetricsReporterFactory {
props.setProperty("testKey", "testValue");
when(config.getProps()).thenReturn(props);
- MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry);
+ MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry).get();
assertTrue(reporter instanceof CustomizableMetricsReporter);
assertEquals(props, ((DummyMetricsReporter) reporter).getProps());
assertEquals(registry, ((DummyMetricsReporter) reporter).getRegistry());
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
index 245a66d83d3..27f7c5a8345 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
@@ -18,21 +18,30 @@
package org.apache.hudi.metrics.prometheus;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.metrics.MetricUtils;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.metrics.MetricsReporterType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.util.ArrayList;
import java.util.Map;
-
import java.util.UUID;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,8 +52,12 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TestPushGateWayReporter {
+ static final URL PROP_FILE_PROMETHEUS_URL = TestPushGateWayReporter.class.getClassLoader().getResource("prometheus.properties");
+ static final URL PROP_FILE_DATADOG_URL = TestPushGateWayReporter.class.getClassLoader().getResource("datadog.properties");
+
@Mock
HoodieWriteConfig config;
+
HoodieMetrics hoodieMetrics;
Metrics metrics;
@@ -58,21 +71,10 @@ public class TestPushGateWayReporter {
@Test
public void testRegisterGauge() {
when(config.isMetricsOn()).thenReturn(true);
- when(config.getTableName()).thenReturn("foo");
- when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS_PUSHGATEWAY);
- when(config.getPushGatewayHost()).thenReturn("localhost");
- when(config.getPushGatewayPort()).thenReturn(9091);
- when(config.getPushGatewayReportPeriodSeconds()).thenReturn(30);
- when(config.getPushGatewayDeleteOnShutdown()).thenReturn(true);
- when(config.getPushGatewayJobName()).thenReturn("foo");
- when(config.getPushGatewayRandomJobNameSuffix()).thenReturn(false);
- when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus");
- when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
- hoodieMetrics = new HoodieMetrics(config);
- metrics = hoodieMetrics.getMetrics();
assertDoesNotThrow(() -> {
- new HoodieMetrics(config);
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
});
metrics.registerGauge("pushGateWayReporter_metric", 123L);
@@ -80,6 +82,42 @@ public class TestPushGateWayReporter {
.get("pushGateWayReporter_metric").getValue().toString());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMultiReporter(boolean addDefaultReporter) throws IOException, InterruptedException, URISyntaxException {
+
+ String propPrometheusPath = Objects.requireNonNull(PROP_FILE_PROMETHEUS_URL).toURI().getPath();
+ String propDatadogPath = Objects.requireNonNull(PROP_FILE_DATADOG_URL).toURI().getPath();
+ if (addDefaultReporter) {
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS_PUSHGATEWAY);
+ when(config.getPushGatewayReportPeriodSeconds()).thenReturn(30);
+ } else {
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+ when(config.getMetricReporterMetricsNamePrefix()).thenReturn(TestPushGateWayReporter.class.getSimpleName());
+ when(config.getMetricReporterFileBasedConfigs()).thenReturn(propPrometheusPath + "," + propDatadogPath);
+ when(config.isMetricsOn()).thenReturn(true);
+ }
+
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
+
+ Map<String, Long> metricsMap = new HashMap<>();
+ Map<String, Long> labellessMetricMap = new HashMap<>();
+ Map<String, String> labels = new HashMap<>();
+ labels.put("group", "a");
+ labels.put("job", "0");
+ metricsMap.put("with_label_metric;group:a,job:0", 1L);
+ labellessMetricMap.put("without_label_metric", 1L);
+ metrics.registerGauges(metricsMap, Option.empty());
+ metrics.registerGauges(labellessMetricMap, Option.empty());
+ List<String> metricKeys = new ArrayList<>(metrics.getRegistry().getGauges().keySet());
+ assertEquals(0, MetricUtils.getLabelsAndMetricMap(metricKeys.stream()
+ .filter(x -> x.contains("without_label_metric")).findFirst().get()).getValue().size());
+ assertEquals(labels, MetricUtils.getLabelsAndMetricMap(metricKeys.stream()
+ .filter(x -> x.contains("with_label_metric")).findFirst().get()).getValue());
+ }
+
@Test
public void testMetricLabels() {
PushGatewayMetricsReporter reporter;
@@ -115,11 +153,11 @@ public class TestPushGateWayReporter {
assertTrue(labels.containsValue("prometheus"));
try {
- when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus,hudi:prometheus");
+ when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus,hudi:prom");
reporter = new PushGatewayMetricsReporter(config, null);
fail("Should fail");
- } catch (HoodieException e) {
- assertTrue(e.getMessage().contains("Duplicate key=hudi found in labels"));
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Multiple values {prometheus, prom} for same key"));
}
}
}
diff --git a/hudi-client/hudi-client-common/src/test/resources/datadog.properties b/hudi-client/hudi-client-common/src/test/resources/datadog.properties
new file mode 100644
index 00000000000..bef794612c9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/resources/datadog.properties
@@ -0,0 +1,25 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.base.path="/tmp/base_path"
+hoodie.metrics.datadog.api.key=f4e1ca9f6e3f16ff5c0800dddfda6179
+hoodie.metrics.datadog.report.period.seconds=3600
+hoodie.metrics.datadog.job.name=test_job
+hoodie.metrics.datadog.metric.host=app.us5.datadoghq
+hoodie.metrics.datadog.metric.prefix=oh
+hoodie.metrics.datadog.api.site=US
+hoodie.metrics.reporter.type=DATADOG
diff --git a/hudi-client/hudi-client-common/src/test/resources/prometheus.properties b/hudi-client/hudi-client-common/src/test/resources/prometheus.properties
new file mode 100644
index 00000000000..3d5d6272fa1
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/resources/prometheus.properties
@@ -0,0 +1,24 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.base.path="/tmp/base_path"
+hoodie.metrics.pushgateway.random.job.name.suffix=false
+hoodie.metrics.reporter.class=
+hoodie.metrics.pushgateway.host=localhost
+hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY
+hoodie.metrics.pushgateway.job.name=onehouse_dataplane
+hoodie.metrics.pushgateway.delete.on.shutdown=false