You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 06:34:05 UTC
[flink] 02/04: [FLINK-11922][metrics] Support MetricReporter
factories
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e63e4b9667e50c267555b64efa7c77db2d577f5b
Author: zentol <ch...@apache.org>
AuthorDate: Fri Mar 15 11:16:01 2019 +0100
[FLINK-11922][metrics] Support MetricReporter factories
---
docs/monitoring/metrics.md | 6 +-
.../flink/configuration/ConfigConstants.java | 3 +
.../flink/metrics/reporter/MetricReporter.java | 14 +-
.../metrics/reporter/MetricReporterFactory.java | 35 +++++
.../flink/runtime/metrics/ReporterSetup.java | 89 ++++++++++--
.../flink/runtime/metrics/ReporterSetupTest.java | 156 +++++++++++++++++++++
...he.flink.metrics.reporter.MetricReporterFactory | 19 +++
7 files changed, 302 insertions(+), 20 deletions(-)
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 4495d7e..37ce572 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -560,12 +560,14 @@ reporters will be instantiated on each job and task manager when they are starte
- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
+- `metrics.reporter.<name>.factory.class`: The reporter factory class to use for the reporter named `<name>`.
- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
- `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`.
- `metrics.reporters`: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.
-All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
-we will list more settings specific to each reporter.
+All reporters must at least have either the `class` or `factory.class` property. Which property may/should be used depends on the reporter implementation. See the individual reporter configuration sections for more information.
+Some reporters (referred to as `Scheduled`) allow specifying a reporting `interval`.
+Below more settings specific to each reporter will be listed.
Example reporter configuration that specifies multiple reporters:
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f1c6b85..9dce4ba 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1241,6 +1241,9 @@ public final class ConfigConstants {
/** The class of the reporter to use. This is used as a suffix in an actual reporter config */
public static final String METRICS_REPORTER_CLASS_SUFFIX = "class";
+ /** The class of the reporter factory to use. This is used as a suffix in an actual reporter config */
+ public static final String METRICS_REPORTER_FACTORY_CLASS_SUFFIX = "factory.class";
+
/** The interval between reports. This is used as a suffix in an actual reporter config */
public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval";
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
index 5c8085f..3ca6e6a 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -25,8 +25,11 @@ import org.apache.flink.metrics.MetricGroup;
/**
* Reporters are used to export {@link Metric Metrics} to an external backend.
*
- * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
- * public no-argument constructor.
+ * <p>Reporters are instantiated either
+ * a) via reflection, in which case they must be public, non-abstract, and have a public no-argument constructor.
+ * b) via a {@link MetricReporterFactory}, in which case no restrictions apply. (recommended)
+ *
+ * <p>Reporters are neither required nor encouraged to support both instantiation paths.
*/
public interface MetricReporter {
@@ -35,8 +38,11 @@ public interface MetricReporter {
// ------------------------------------------------------------------------
/**
- * Configures this reporter. Since reporters are instantiated generically and hence parameter-less,
- * this method is the place where the reporters set their basic fields based on configuration values.
+ * Configures this reporter.
+ *
+ * <p>If the reporter was instantiated generically and hence parameter-less,
+ * this method is the place where the reporter sets it's basic fields based on configuration values.
+ * Otherwise, this method will typically be a no-op since resources can be acquired in the constructor.
*
* <p>This method is always called first on a newly instantiated reporter.
*
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java
new file mode 100644
index 0000000..125d01f
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporter} factory.
+ */
+public interface MetricReporterFactory {
+
+ /**
+ * Creates a new metric reporter.
+ *
+ * @param properties configured properties for the reporter
+ * @return created metric reporter
+ */
+ MetricReporter createMetricReporter(final Properties properties);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index b6d3827..2d1624f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -25,14 +25,20 @@ import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Matcher;
@@ -54,7 +60,7 @@ public final class ReporterSetup {
Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
// [\S&&[^.]] = intersection of non-whitespace and non-period character classes
"([\\S&&[^.]]*)\\." +
- Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX));
+ '(' + Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX) + '|' + Pattern.quote(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX) + ')');
private final String name;
private final MetricConfig configuration;
@@ -142,28 +148,23 @@ public final class ReporterSetup {
configuration,
ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');
- reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
+ reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration));
}
+ final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
List<ReporterSetup> reporterArguments = new ArrayList<>(reporterConfigurations.size());
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
String reporterName = reporterConfiguration.f0;
Configuration reporterConfig = reporterConfiguration.f1;
try {
- final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
- if (reporterClassName == null) {
- LOG.error("No reporter class set for reporter " + reporterName + ". Metrics might not be exposed/reported.");
- continue;
- }
-
- Class<?> reporterClass = Class.forName(reporterClassName);
- MetricReporter reporter = (MetricReporter) reporterClass.newInstance();
-
- MetricConfig metricConfig = new MetricConfig();
- reporterConfig.addAllToProperties(metricConfig);
+ Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
+ metricReporterOptional.ifPresent(reporter -> {
+ MetricConfig metricConfig = new MetricConfig();
+ reporterConfig.addAllToProperties(metricConfig);
- reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
+ reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
+ });
}
catch (Throwable t) {
LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
@@ -171,4 +172,64 @@ public final class ReporterSetup {
}
return reporterArguments;
}
+
+ private static Map<String, MetricReporterFactory> loadReporterFactories() {
+ final ServiceLoader<MetricReporterFactory> serviceLoader = ServiceLoader.load(MetricReporterFactory.class);
+
+ final Map<String, MetricReporterFactory> reporterFactories = new HashMap<>(2);
+ final Iterator<MetricReporterFactory> factoryIterator = serviceLoader.iterator();
+ // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors
+ // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class
+ while (factoryIterator.hasNext()) {
+ try {
+ MetricReporterFactory factory = factoryIterator.next();
+ reporterFactories.put(factory.getClass().getName(), factory);
+ } catch (Exception | ServiceConfigurationError e) {
+ LOG.warn("Error while loading reporter factory.", e);
+ }
+ }
+
+ return Collections.unmodifiableMap(reporterFactories);
+ }
+
+ private static Optional<MetricReporter> loadReporter(
+ final String reporterName,
+ final Configuration reporterConfig,
+ final Map<String, MetricReporterFactory> reporterFactories)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+
+ final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+ final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);
+
+ if (factoryClassName != null) {
+ return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
+ }
+
+ if (reporterClassName != null) {
+ final Class<?> reporterClass = Class.forName(reporterClassName);
+ return Optional.of((MetricReporter) reporterClass.newInstance());
+ }
+
+ LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
+ return Optional.empty();
+ }
+
+ private static Optional<MetricReporter> loadViaFactory(
+ final String factoryClassName,
+ final String reporterName,
+ final Configuration reporterConfig,
+ final Map<String, MetricReporterFactory> reporterFactories) {
+
+ MetricReporterFactory factory = reporterFactories.get(factoryClassName);
+
+ if (factory == null) {
+ LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: ", factoryClassName, reporterName, reporterFactories.keySet());
+ return Optional.empty();
+ } else {
+ final MetricConfig metricConfig = new MetricConfig();
+ reporterConfig.addAllToProperties(metricConfig);
+
+ return Optional.of(factory.createMetricReporter(metricConfig));
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index eedaa60..f3e86cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.apache.flink.util.TestLogger;
@@ -31,9 +32,11 @@ import org.junit.Test;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link ReporterSetup}.
@@ -214,4 +217,157 @@ public class ReporterSetupTest extends TestLogger {
Assert.assertEquals("value3", setup.getConfiguration().getString("arg3", null));
Assert.assertEquals(TestReporter2.class.getName(), setup.getConfiguration().getString("class", null));
}
+
+ /**
+ * Verifies that a factory configuration is correctly parsed.
+ */
+ @Test
+ public void testFactoryParsing() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName());
+
+ final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+ assertEquals(1, reporterSetups.size());
+
+ final ReporterSetup reporterSetup = reporterSetups.get(0);
+
+ assertEquals(TestReporterFactory.REPORTER, reporterSetup.getReporter());
+ }
+
+ /**
+ * Verifies that the factory approach is prioritized if both the factory and reflection approach are configured.
+ */
+ @Test
+ public void testFactoryPrioritization() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName());
+
+ final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+ assertEquals(1, reporterSetups.size());
+
+ final ReporterSetup reporterSetup = reporterSetups.get(0);
+ final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
+
+ assertTrue(metricReporter.createdByFactory);
+ }
+
+ /**
+ * Verifies that an error thrown by a factory does not affect the setup of other reporters.
+ */
+ @Test
+ public void testFactoryFailureIsolation() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "fail." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, FailingFactory.class.getName());
+
+ final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+ assertEquals(1, reporterSetups.size());
+ }
+
+ /**
+ * Verifies that factory/reflection approaches can be mixed freely.
+ */
+ @Test
+ public void testMixedSetupsFactoryParsing() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName());
+
+ final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+ assertEquals(2, reporterSetups.size());
+
+ final ReporterSetup reporterSetup1 = reporterSetups.get(0);
+ final ReporterSetup reporterSetup2 = reporterSetups.get(1);
+
+ final InstantiationTypeTrackingTestReporter metricReporter1 = (InstantiationTypeTrackingTestReporter) reporterSetup1.getReporter();
+ final InstantiationTypeTrackingTestReporter metricReporter2 = (InstantiationTypeTrackingTestReporter) reporterSetup2.getReporter();
+
+ assertTrue(metricReporter1.createdByFactory ^ metricReporter2.createdByFactory);
+ }
+
+ @Test
+ public void testFactoryArgumentForwarding() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, ConfigExposingReporterFactory.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg", "hello");
+
+ ReporterSetup.fromConfiguration(config);
+
+ Properties passedConfig = ConfigExposingReporterFactory.lastConfig;
+ assertEquals("hello", passedConfig.getProperty("arg"));
+ }
+
+ /**
+ * Factory that exposed the last provided metric config.
+ */
+ public static class ConfigExposingReporterFactory implements MetricReporterFactory {
+
+ static Properties lastConfig = null;
+
+ @Override
+ public MetricReporter createMetricReporter(Properties config) {
+ lastConfig = config;
+ return new TestReporter();
+ }
+ }
+
+ /**
+ * Factory that returns a static reporter.
+ */
+ public static class TestReporterFactory implements MetricReporterFactory {
+
+ static final MetricReporter REPORTER = new TestReporter();
+
+ @Override
+ public MetricReporter createMetricReporter(Properties config) {
+ return REPORTER;
+ }
+ }
+
+ /**
+ * Factory that always throws an error.
+ */
+ public static class FailingFactory implements MetricReporterFactory {
+
+ @Override
+ public MetricReporter createMetricReporter(Properties config) {
+ throw new RuntimeException();
+ }
+ }
+
+ /**
+ * Factory for {@link InstantiationTypeTrackingTestReporter}.
+ */
+ public static class InstantiationTypeTrackingTestReporterFactory implements MetricReporterFactory {
+
+ @Override
+ public MetricReporter createMetricReporter(Properties config) {
+ return new InstantiationTypeTrackingTestReporter(true);
+ }
+ }
+
+ /**
+ * Reporter that exposes which constructor was called.
+ */
+ protected static class InstantiationTypeTrackingTestReporter extends TestReporter {
+
+ private final boolean createdByFactory;
+
+ public InstantiationTypeTrackingTestReporter() {
+ this(false);
+ }
+
+ InstantiationTypeTrackingTestReporter(boolean createdByFactory) {
+ this.createdByFactory = createdByFactory;
+ }
+
+ public boolean isCreatedByFactory() {
+ return createdByFactory;
+ }
+ }
}
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000..61b00fd
--- /dev/null
+++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory