You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 06:34:05 UTC

[flink] 02/04: [FLINK-11922][metrics] Support MetricReporter factories

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e63e4b9667e50c267555b64efa7c77db2d577f5b
Author: zentol <ch...@apache.org>
AuthorDate: Fri Mar 15 11:16:01 2019 +0100

    [FLINK-11922][metrics] Support MetricReporter factories
---
 docs/monitoring/metrics.md                         |   6 +-
 .../flink/configuration/ConfigConstants.java       |   3 +
 .../flink/metrics/reporter/MetricReporter.java     |  14 +-
 .../metrics/reporter/MetricReporterFactory.java    |  35 +++++
 .../flink/runtime/metrics/ReporterSetup.java       |  89 ++++++++++--
 .../flink/runtime/metrics/ReporterSetupTest.java   | 156 +++++++++++++++++++++
 ...he.flink.metrics.reporter.MetricReporterFactory |  19 +++
 7 files changed, 302 insertions(+), 20 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 4495d7e..37ce572 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -560,12 +560,14 @@ reporters will be instantiated on each job and task manager when they are starte
 
 - `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
 - `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
+- `metrics.reporter.<name>.factory.class`: The reporter factory class to use for the reporter named `<name>`.
 - `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
 - `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`.
 - `metrics.reporters`: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.
 
-All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
-we will list more settings specific to each reporter.
+All reporters must at least have either the `class` or `factory.class` property. Which property may/should be used depends on the reporter implementation. See the individual reporter configuration sections for more information.
+Some reporters (referred to as `Scheduled`) allow specifying a reporting `interval`.
+Below more settings specific to each reporter will be listed.
 
 Example reporter configuration that specifies multiple reporters:
 
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f1c6b85..9dce4ba 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1241,6 +1241,9 @@ public final class ConfigConstants {
 	/** The class of the reporter to use. This is used as a suffix in an actual reporter config */
 	public static final String METRICS_REPORTER_CLASS_SUFFIX = "class";
 
+	/** The class of the reporter factory to use. This is used as a suffix in an actual reporter config */
+	public static final String METRICS_REPORTER_FACTORY_CLASS_SUFFIX = "factory.class";
+
 	/** The interval between reports. This is used as a suffix in an actual reporter config */
 	public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval";
 
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
index 5c8085f..3ca6e6a 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -25,8 +25,11 @@ import org.apache.flink.metrics.MetricGroup;
 /**
  * Reporters are used to export {@link Metric Metrics} to an external backend.
  *
- * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
- * public no-argument constructor.
+ * <p>Reporters are instantiated either
+ * a) via reflection, in which case they must be public, non-abstract, and have a public no-argument constructor.
+ * b) via a {@link MetricReporterFactory}, in which case no restrictions apply. (recommended)
+ *
+ * <p>Reporters are neither required nor encouraged to support both instantiation paths.
  */
 public interface MetricReporter {
 
@@ -35,8 +38,11 @@ public interface MetricReporter {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Configures this reporter. Since reporters are instantiated generically and hence parameter-less,
-	 * this method is the place where the reporters set their basic fields based on configuration values.
+	 * Configures this reporter.
+	 *
+	 * <p>If the reporter was instantiated generically and hence parameter-less,
+	 * this method is the place where the reporter sets it's basic fields based on configuration values.
+	 * Otherwise, this method will typically be a no-op since resources can be acquired in the constructor.
 	 *
 	 * <p>This method is always called first on a newly instantiated reporter.
 	 *
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java
new file mode 100644
index 0000000..125d01f
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporter} factory.
+ */
+public interface MetricReporterFactory {
+
+	/**
+	 * Creates a new metric reporter.
+	 *
+	 * @param properties configured properties for the reporter
+	 * @return created metric reporter
+	 */
+	MetricReporter createMetricReporter(final Properties properties);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index b6d3827..2d1624f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -25,14 +25,20 @@ import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
@@ -54,7 +60,7 @@ public final class ReporterSetup {
 		Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
 			// [\S&&[^.]] = intersection of non-whitespace and non-period character classes
 			"([\\S&&[^.]]*)\\." +
-			Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX));
+			'(' + Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX) + '|' + Pattern.quote(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX) + ')');
 
 	private final String name;
 	private final MetricConfig configuration;
@@ -142,28 +148,23 @@ public final class ReporterSetup {
 				configuration,
 				ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');
 
-			reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
+			reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration));
 		}
 
+		final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
 		List<ReporterSetup> reporterArguments = new ArrayList<>(reporterConfigurations.size());
 		for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
 			String reporterName = reporterConfiguration.f0;
 			Configuration reporterConfig = reporterConfiguration.f1;
 
 			try {
-				final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-				if (reporterClassName == null) {
-					LOG.error("No reporter class set for reporter " + reporterName + ". Metrics might not be exposed/reported.");
-					continue;
-				}
-
-				Class<?> reporterClass = Class.forName(reporterClassName);
-				MetricReporter reporter = (MetricReporter) reporterClass.newInstance();
-
-				MetricConfig metricConfig = new MetricConfig();
-				reporterConfig.addAllToProperties(metricConfig);
+				Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
+				metricReporterOptional.ifPresent(reporter -> {
+					MetricConfig metricConfig = new MetricConfig();
+					reporterConfig.addAllToProperties(metricConfig);
 
-				reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
+					reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
+				});
 			}
 			catch (Throwable t) {
 				LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
@@ -171,4 +172,64 @@ public final class ReporterSetup {
 		}
 		return reporterArguments;
 	}
+
+	private static Map<String, MetricReporterFactory> loadReporterFactories() {
+		final ServiceLoader<MetricReporterFactory> serviceLoader = ServiceLoader.load(MetricReporterFactory.class);
+
+		final Map<String, MetricReporterFactory> reporterFactories = new HashMap<>(2);
+		final Iterator<MetricReporterFactory> factoryIterator = serviceLoader.iterator();
+		// do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors
+		// such an error might be caused if the META-INF/services contains an entry to a non-existing factory class
+		while (factoryIterator.hasNext()) {
+			try {
+				MetricReporterFactory factory = factoryIterator.next();
+				reporterFactories.put(factory.getClass().getName(), factory);
+			} catch (Exception | ServiceConfigurationError e) {
+				LOG.warn("Error while loading reporter factory.", e);
+			}
+		}
+
+		return Collections.unmodifiableMap(reporterFactories);
+	}
+
+	private static Optional<MetricReporter> loadReporter(
+			final String reporterName,
+			final Configuration reporterConfig,
+			final Map<String, MetricReporterFactory> reporterFactories)
+			throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+
+		final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+		final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);
+
+		if (factoryClassName != null) {
+			return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
+		}
+
+		if (reporterClassName != null) {
+			final Class<?> reporterClass = Class.forName(reporterClassName);
+			return Optional.of((MetricReporter) reporterClass.newInstance());
+		}
+
+		LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
+		return Optional.empty();
+	}
+
+	private static Optional<MetricReporter> loadViaFactory(
+			final String factoryClassName,
+			final String reporterName,
+			final Configuration reporterConfig,
+			final Map<String, MetricReporterFactory> reporterFactories) {
+
+		MetricReporterFactory factory = reporterFactories.get(factoryClassName);
+
+		if (factory == null) {
+			LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: ", factoryClassName, reporterName, reporterFactories.keySet());
+			return Optional.empty();
+		} else {
+			final MetricConfig metricConfig = new MetricConfig();
+			reporterConfig.addAllToProperties(metricConfig);
+
+			return Optional.of(factory.createMetricReporter(metricConfig));
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index eedaa60..f3e86cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.apache.flink.util.TestLogger;
 
@@ -31,9 +32,11 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link ReporterSetup}.
@@ -214,4 +217,157 @@ public class ReporterSetupTest extends TestLogger {
 		Assert.assertEquals("value3", setup.getConfiguration().getString("arg3", null));
 		Assert.assertEquals(TestReporter2.class.getName(), setup.getConfiguration().getString("class", null));
 	}
+
+	/**
+	 * Verifies that a factory configuration is correctly parsed.
+	 */
+	@Test
+	public void testFactoryParsing() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(1, reporterSetups.size());
+
+		final ReporterSetup reporterSetup = reporterSetups.get(0);
+
+		assertEquals(TestReporterFactory.REPORTER, reporterSetup.getReporter());
+	}
+
+	/**
+	 * Verifies that the factory approach is prioritized if both the factory and reflection approach are configured.
+	 */
+	@Test
+	public void testFactoryPrioritization() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(1, reporterSetups.size());
+
+		final ReporterSetup reporterSetup = reporterSetups.get(0);
+		final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
+
+		assertTrue(metricReporter.createdByFactory);
+	}
+
+	/**
+	 * Verifies that an error thrown by a factory does not affect the setup of other reporters.
+	 */
+	@Test
+	public void testFactoryFailureIsolation() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "fail." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, FailingFactory.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(1, reporterSetups.size());
+	}
+
+	/**
+	 * Verifies that factory/reflection approaches can be mixed freely.
+	 */
+	@Test
+	public void testMixedSetupsFactoryParsing() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(2, reporterSetups.size());
+
+		final ReporterSetup reporterSetup1 = reporterSetups.get(0);
+		final ReporterSetup reporterSetup2 = reporterSetups.get(1);
+
+		final InstantiationTypeTrackingTestReporter metricReporter1 = (InstantiationTypeTrackingTestReporter) reporterSetup1.getReporter();
+		final InstantiationTypeTrackingTestReporter metricReporter2 = (InstantiationTypeTrackingTestReporter) reporterSetup2.getReporter();
+
+		assertTrue(metricReporter1.createdByFactory ^ metricReporter2.createdByFactory);
+	}
+
+	@Test
+	public void testFactoryArgumentForwarding() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, ConfigExposingReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg", "hello");
+
+		ReporterSetup.fromConfiguration(config);
+
+		Properties passedConfig = ConfigExposingReporterFactory.lastConfig;
+		assertEquals("hello", passedConfig.getProperty("arg"));
+	}
+
+	/**
+	 * Factory that exposed the last provided metric config.
+	 */
+	public static class ConfigExposingReporterFactory implements MetricReporterFactory {
+
+		static Properties lastConfig = null;
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			lastConfig = config;
+			return new TestReporter();
+		}
+	}
+
+	/**
+	 * Factory that returns a static reporter.
+	 */
+	public static class TestReporterFactory implements MetricReporterFactory {
+
+		static final MetricReporter REPORTER = new TestReporter();
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			return REPORTER;
+		}
+	}
+
+	/**
+	 * Factory that always throws an error.
+	 */
+	public static class FailingFactory implements MetricReporterFactory {
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			throw new RuntimeException();
+		}
+	}
+
+	/**
+	 * Factory for {@link InstantiationTypeTrackingTestReporter}.
+	 */
+	public static class InstantiationTypeTrackingTestReporterFactory implements MetricReporterFactory {
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			return new InstantiationTypeTrackingTestReporter(true);
+		}
+	}
+
+	/**
+	 * Reporter that exposes which constructor was called.
+	 */
+	protected static class InstantiationTypeTrackingTestReporter extends TestReporter {
+
+		private final boolean createdByFactory;
+
+		public InstantiationTypeTrackingTestReporter() {
+			this(false);
+		}
+
+		InstantiationTypeTrackingTestReporter(boolean createdByFactory) {
+			this.createdByFactory = createdByFactory;
+		}
+
+		public boolean isCreatedByFactory() {
+			return createdByFactory;
+		}
+	}
 }
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000..61b00fd
--- /dev/null
+++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory