You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/03/27 15:24:06 UTC

[flink] branch master updated: [FLINK-16832][metrics] Refactor ReporterSetup

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a0ea5e9  [FLINK-16832][metrics] Refactor ReporterSetup
a0ea5e9 is described below

commit a0ea5e9980fee01d961a2f7c249e400325220a98
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Tue Mar 17 15:57:39 2020 +0100

    [FLINK-16832][metrics] Refactor ReporterSetup
    
    Introduce methods for better readibility.
---
 .../flink/runtime/metrics/ReporterSetup.java       | 76 +++++++++++++---------
 1 file changed, 46 insertions(+), 30 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index 40fb9dd..62cb543 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -127,23 +127,39 @@ public final class ReporterSetup {
 
 	public static List<ReporterSetup> fromConfiguration(final Configuration configuration) {
 		String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, "");
+
+		Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, includedReportersString);
+
+		if (namedReporters.isEmpty()) {
+			return Collections.emptyList();
+		}
+
+		final List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters);
+
+		final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
+
+		return setupReporters(reporterFactories, reporterConfigurations);
+	}
+
+	private static Set<String> findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) {
 		Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString)
 			.filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+
 			.collect(Collectors.toSet());
 
 		// use a TreeSet to make the reporter order deterministic, which is useful for testing
-		Set<String> namedReporters = new TreeSet<>(String::compareTo);
-		// scan entire configuration for "metric.reporter" keys and parse individual reporter configurations
+		Set<String> namedOrderedReporters = new TreeSet<>(String::compareTo);
+
+		// scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters
 		for (String key : configuration.keySet()) {
 			if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
 				Matcher matcher = reporterClassPattern.matcher(key);
 				if (matcher.matches()) {
 					String reporterName = matcher.group(1);
 					if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) {
-						if (namedReporters.contains(reporterName)) {
+						if (namedOrderedReporters.contains(reporterName)) {
 							LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName);
 						} else {
-							namedReporters.add(reporterName);
+							namedOrderedReporters.add(reporterName);
 						}
 					} else {
 						LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString);
@@ -151,12 +167,11 @@ public final class ReporterSetup {
 				}
 			}
 		}
+		return namedOrderedReporters;
+	}
 
-		if (namedReporters.isEmpty()) {
-			return Collections.emptyList();
-		}
-
-		List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size());
+	private static List<Tuple2<String, Configuration>> loadReporterConfigurations(Configuration configuration, Set<String> namedReporters) {
+		final List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size());
 
 		for (String namedReporter: namedReporters) {
 			DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
@@ -165,27 +180,7 @@ public final class ReporterSetup {
 
 			reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration));
 		}
-
-		final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
-		List<ReporterSetup> reporterArguments = new ArrayList<>(reporterConfigurations.size());
-		for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
-			String reporterName = reporterConfiguration.f0;
-			Configuration reporterConfig = reporterConfiguration.f1;
-
-			try {
-				Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
-				metricReporterOptional.ifPresent(reporter -> {
-					MetricConfig metricConfig = new MetricConfig();
-					reporterConfig.addAllToProperties(metricConfig);
-
-					reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
-				});
-			}
-			catch (Throwable t) {
-				LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
-			}
-		}
-		return reporterArguments;
+		return reporterConfigurations;
 	}
 
 	private static Map<String, MetricReporterFactory> loadReporterFactories() {
@@ -207,6 +202,27 @@ public final class ReporterSetup {
 		return Collections.unmodifiableMap(reporterFactories);
 	}
 
+	private static List<ReporterSetup> setupReporters(Map<String, MetricReporterFactory> reporterFactories, List<Tuple2<String, Configuration>> reporterConfigurations) {
+		List<ReporterSetup> reporterSetups = new ArrayList<>(reporterConfigurations.size());
+		for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
+			String reporterName = reporterConfiguration.f0;
+			Configuration reporterConfig = reporterConfiguration.f1;
+
+			try {
+				Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
+				metricReporterOptional.ifPresent(reporter -> {
+					MetricConfig metricConfig = new MetricConfig();
+					reporterConfig.addAllToProperties(metricConfig);
+					reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
+				});
+			}
+			catch (Throwable t) {
+				LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
+			}
+		}
+		return reporterSetups;
+	}
+
 	private static Optional<MetricReporter> loadReporter(
 			final String reporterName,
 			final Configuration reporterConfig,