You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/03/27 15:24:06 UTC
[flink] branch master updated: [FLINK-16832][metrics] Refactor
ReporterSetup
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a0ea5e9 [FLINK-16832][metrics] Refactor ReporterSetup
a0ea5e9 is described below
commit a0ea5e9980fee01d961a2f7c249e400325220a98
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Tue Mar 17 15:57:39 2020 +0100
[FLINK-16832][metrics] Refactor ReporterSetup
Introduce methods for better readibility.
---
.../flink/runtime/metrics/ReporterSetup.java | 76 +++++++++++++---------
1 file changed, 46 insertions(+), 30 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index 40fb9dd..62cb543 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -127,23 +127,39 @@ public final class ReporterSetup {
public static List<ReporterSetup> fromConfiguration(final Configuration configuration) {
String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, "");
+
+ Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, includedReportersString);
+
+ if (namedReporters.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters);
+
+ final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
+
+ return setupReporters(reporterFactories, reporterConfigurations);
+ }
+
+ private static Set<String> findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) {
Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString)
.filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+
.collect(Collectors.toSet());
// use a TreeSet to make the reporter order deterministic, which is useful for testing
- Set<String> namedReporters = new TreeSet<>(String::compareTo);
- // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations
+ Set<String> namedOrderedReporters = new TreeSet<>(String::compareTo);
+
+ // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters
for (String key : configuration.keySet()) {
if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
Matcher matcher = reporterClassPattern.matcher(key);
if (matcher.matches()) {
String reporterName = matcher.group(1);
if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) {
- if (namedReporters.contains(reporterName)) {
+ if (namedOrderedReporters.contains(reporterName)) {
LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName);
} else {
- namedReporters.add(reporterName);
+ namedOrderedReporters.add(reporterName);
}
} else {
LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString);
@@ -151,12 +167,11 @@ public final class ReporterSetup {
}
}
}
+ return namedOrderedReporters;
+ }
- if (namedReporters.isEmpty()) {
- return Collections.emptyList();
- }
-
- List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size());
+ private static List<Tuple2<String, Configuration>> loadReporterConfigurations(Configuration configuration, Set<String> namedReporters) {
+ final List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size());
for (String namedReporter: namedReporters) {
DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
@@ -165,27 +180,7 @@ public final class ReporterSetup {
reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration));
}
-
- final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
- List<ReporterSetup> reporterArguments = new ArrayList<>(reporterConfigurations.size());
- for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
- String reporterName = reporterConfiguration.f0;
- Configuration reporterConfig = reporterConfiguration.f1;
-
- try {
- Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
- metricReporterOptional.ifPresent(reporter -> {
- MetricConfig metricConfig = new MetricConfig();
- reporterConfig.addAllToProperties(metricConfig);
-
- reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
- });
- }
- catch (Throwable t) {
- LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
- }
- }
- return reporterArguments;
+ return reporterConfigurations;
}
private static Map<String, MetricReporterFactory> loadReporterFactories() {
@@ -207,6 +202,27 @@ public final class ReporterSetup {
return Collections.unmodifiableMap(reporterFactories);
}
+ private static List<ReporterSetup> setupReporters(Map<String, MetricReporterFactory> reporterFactories, List<Tuple2<String, Configuration>> reporterConfigurations) {
+ List<ReporterSetup> reporterSetups = new ArrayList<>(reporterConfigurations.size());
+ for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
+ String reporterName = reporterConfiguration.f0;
+ Configuration reporterConfig = reporterConfiguration.f1;
+
+ try {
+ Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
+ metricReporterOptional.ifPresent(reporter -> {
+ MetricConfig metricConfig = new MetricConfig();
+ reporterConfig.addAllToProperties(metricConfig);
+ reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
+ });
+ }
+ catch (Throwable t) {
+ LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
+ }
+ }
+ return reporterSetups;
+ }
+
private static Optional<MetricReporter> loadReporter(
final String reporterName,
final Configuration reporterConfig,