You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 06:34:03 UTC

[flink] branch master updated (d95d395 -> c3c74e0)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d95d395  [FLINK-12370][python][travis] Integrated Travis for Python API.
     new 7534b6c  [hotfix][metrics] Exit early if not reporters are configured
     new e63e4b9  [FLINK-11922][metrics] Support MetricReporter factories
     new 81b0464  [FLINK-11922][metrics] Add utils for backwards compatibility
     new c3c74e0  [FLINK-11922][metrics] Add JMXReporterFactory

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/monitoring/metrics.md                         |  10 +-
 docs/monitoring/metrics.zh.md                      |   4 +-
 .../flink/configuration/ConfigConstants.java       |   3 +
 .../metrics/reporter/InstantiateViaFactory.java    |  19 +--
 .../flink/metrics/reporter/MetricReporter.java     |  14 +-
 .../{Scheduled.java => MetricReporterFactory.java} |  14 +-
 .../org/apache/flink/metrics/jmx/JMXReporter.java  |  49 +++---
 .../flink/metrics/jmx/JMXReporterFactory.java      |  21 +--
 ...he.flink.metrics.reporter.MetricReporterFactory |   2 +-
 .../flink/metrics/jmx/JMXReporterFactoryTest.java  |  64 ++++++++
 .../apache/flink/metrics/jmx/JMXReporterTest.java  |  23 +--
 .../flink/runtime/metrics/ReporterSetup.java       | 132 ++++++++++++---
 .../flink/runtime/metrics/ReporterSetupTest.java   | 182 +++++++++++++++++++++
 ...he.flink.metrics.reporter.MetricReporterFactory |   6 +-
 14 files changed, 443 insertions(+), 100 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java => flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java (72%)
 copy flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/{Scheduled.java => MetricReporterFactory.java} (73%)
 copy flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java => flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java (63%)
 copy flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory => flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory (94%)
 create mode 100644 flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java
 copy flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory => flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory (71%)


[flink] 03/04: [FLINK-11922][metrics] Add utils for backwards compatibility

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81b046418d499bb5ff18d6f25d52d776ad524a2a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 3 13:14:09 2019 +0200

    [FLINK-11922][metrics] Add utils for backwards compatibility
---
 .../metrics/reporter/InstantiateViaFactory.java    | 36 ++++++++++++++++++++++
 .../flink/runtime/metrics/ReporterSetup.java       | 29 +++++++++++++++--
 .../flink/runtime/metrics/ReporterSetupTest.java   | 26 ++++++++++++++++
 3 files changed, 89 insertions(+), 2 deletions(-)

diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
new file mode 100644
index 0000000..454ec91
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for {@link MetricReporter MetricReporters} that support factories but want to maintain
+ * backwards-compatibility with existing reflection-based configurations.
+ *
+ * <p>When an annotated reporter is configured to be used via reflection the given factory will be used instead.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface InstantiateViaFactory {
+	String factoryClassName();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index 2d1624f..576700c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.InstantiateViaFactory;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
@@ -206,8 +207,7 @@ public final class ReporterSetup {
 		}
 
 		if (reporterClassName != null) {
-			final Class<?> reporterClass = Class.forName(reporterClassName);
-			return Optional.of((MetricReporter) reporterClass.newInstance());
+			return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories);
 		}
 
 		LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
@@ -232,4 +232,29 @@ public final class ReporterSetup {
 			return Optional.of(factory.createMetricReporter(metricConfig));
 		}
 	}
+
+	private static Optional<MetricReporter> loadViaReflection(
+			final String reporterClassName,
+			final String reporterName,
+			final Configuration reporterConfig,
+			final Map<String, MetricReporterFactory> reporterFactories) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+
+		final Class<?> reporterClass = Class.forName(reporterClassName);
+
+		final InstantiateViaFactory alternativeFactoryAnnotation = reporterClass.getAnnotation(InstantiateViaFactory.class);
+		if (alternativeFactoryAnnotation != null) {
+			final String alternativeFactoryClassName = alternativeFactoryAnnotation.factoryClassName();
+			LOG.info("The reporter configuration of {} is out-dated (but still supported)." +
+					" Please configure a factory class instead: '{}{}.{}: {}' to ensure that the configuration" +
+					" continues to work with future versions.",
+				reporterName,
+				ConfigConstants.METRICS_REPORTER_PREFIX,
+				reporterName,
+				ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX,
+				alternativeFactoryClassName);
+			return loadViaFactory(alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
+		}
+
+		return Optional.of((MetricReporter) reporterClass.newInstance());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index f3e86cb..6bc820a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.InstantiateViaFactory;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.util.TestReporter;
@@ -303,6 +304,24 @@ public class ReporterSetupTest extends TestLogger {
 	}
 
 	/**
+	 * Verifies that the factory approach is used if the factory is annotated with {@link InstantiateViaFactory}.
+	 */
+	@Test
+	public void testFactoryAnnotation() {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter2.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(1, reporterSetups.size());
+
+		final ReporterSetup reporterSetup = reporterSetups.get(0);
+		final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
+
+		assertTrue(metricReporter.createdByFactory);
+	}
+
+	/**
 	 * Factory that exposed the last provided metric config.
 	 */
 	public static class ConfigExposingReporterFactory implements MetricReporterFactory {
@@ -370,4 +389,11 @@ public class ReporterSetupTest extends TestLogger {
 			return createdByFactory;
 		}
 	}
+
+	/**
+	 * Annotated reporter that exposes which constructor was called.
+	 */
+	@InstantiateViaFactory(factoryClassName = "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory")
+	protected static class InstantiationTypeTrackingTestReporter2 extends InstantiationTypeTrackingTestReporter {
+	}
 }


[flink] 04/04: [FLINK-11922][metrics] Add JMXReporterFactory

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3c74e005ff3a6e10216742bd58a5145c867f665
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 3 13:14:39 2019 +0200

    [FLINK-11922][metrics] Add JMXReporterFactory
---
 docs/monitoring/metrics.md                         |  4 +-
 docs/monitoring/metrics.zh.md                      |  4 +-
 .../org/apache/flink/metrics/jmx/JMXReporter.java  | 49 +++++++++--------
 .../flink/metrics/jmx/JMXReporterFactory.java      | 36 ++++++++++++
 ...he.flink.metrics.reporter.MetricReporterFactory | 16 ++++++
 .../flink/metrics/jmx/JMXReporterFactoryTest.java  | 64 ++++++++++++++++++++++
 .../apache/flink/metrics/jmx/JMXReporterTest.java  | 23 +++-----
 7 files changed, 155 insertions(+), 41 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 37ce572..3adbf7f 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -574,7 +574,7 @@ Example reporter configuration that specifies multiple reporters:
 {% highlight yaml %}
 metrics.reporters: my_jmx_reporter,my_other_reporter
 
-metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.my_jmx_reporter.port: 9020-9040
 
 metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
@@ -607,7 +607,7 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.jmx.port: 8789
 
 {% endhighlight %}
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index e9b9424..1f14347 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -572,7 +572,7 @@ Example reporter configuration that specifies multiple reporters:
 {% highlight yaml %}
 metrics.reporters: my_jmx_reporter,my_other_reporter
 
-metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.my_jmx_reporter.port: 9020-9040
 
 metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
@@ -605,7 +605,7 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.jmx.port: 8789
 
 {% endhighlight %}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 461f1dc..217783d 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.InstantiateViaFactory;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
@@ -34,6 +35,7 @@ import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServer;
@@ -55,6 +57,7 @@ import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -62,12 +65,11 @@ import java.util.Map;
  * <p>Largely based on the JmxReporter class of the dropwizard metrics library
  * https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
  */
+@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.jmx.JMXReporterFactory")
 public class JMXReporter implements MetricReporter {
 
 	static final String JMX_DOMAIN_PREFIX = "org.apache.flink.";
 
-	public static final String ARG_PORT = "port";
-
 	private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class);
 
 	private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
@@ -86,33 +88,24 @@ public class JMXReporter implements MetricReporter {
 	private final Map<Metric, ObjectName> registeredMetrics;
 
 	/** The server to which JMX clients connect to. Allows for better control over port usage. */
-	private JMXServer jmxServer;
+	@Nullable
+	private final JMXServer jmxServer;
 
-	public JMXReporter() {
+	JMXReporter(@Nullable final String portsConfig) {
 		this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
 		this.registeredMetrics = new HashMap<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  life cycle
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open(MetricConfig config) {
-		String portsConfig = config.getString(ARG_PORT, null);
 
 		if (portsConfig != null) {
 			Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
 
-			JMXServer server = new JMXServer();
-			while (ports.hasNext()) {
+			JMXServer successfullyStartedServer = null;
+			while (ports.hasNext() && successfullyStartedServer == null) {
+				JMXServer server = new JMXServer();
 				int port = ports.next();
 				try {
 					server.start(port);
 					LOG.info("Started JMX server on port " + port + ".");
-					// only set our field if the server was actually started
-					jmxServer = server;
-					break;
+					successfullyStartedServer = server;
 				} catch (IOException ioe) { //assume port conflict
 					LOG.debug("Could not start JMX server on port " + port + ".", ioe);
 					try {
@@ -122,13 +115,24 @@ public class JMXReporter implements MetricReporter {
 					}
 				}
 			}
-			if (jmxServer == null) {
+			if (successfullyStartedServer == null) {
 				throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig);
 			}
+			this.jmxServer = successfullyStartedServer;
+		} else {
+			this.jmxServer = null;
 		}
 		LOG.info("Configured JMXReporter with {port:{}}", portsConfig);
 	}
 
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(MetricConfig config) {
+	}
+
 	@Override
 	public void close() {
 		if (jmxServer != null) {
@@ -140,11 +144,12 @@ public class JMXReporter implements MetricReporter {
 		}
 	}
 
-	public int getPort() {
+	public Optional<Integer> getPort() {
 		if (jmxServer == null) {
-			throw new NullPointerException("No server was opened. Did you specify a port?");
+			return Optional.empty();
+		} else {
+			return Optional.of(jmxServer.port);
 		}
-		return jmxServer.port;
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
new file mode 100644
index 0000000..128fb6b
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporterFactory} for {@link JMXReporter}.
+ */
+public class JMXReporterFactory implements MetricReporterFactory {
+
+	static final String ARG_PORT = "port";
+
+	@Override
+	public JMXReporter createMetricReporter(Properties properties) {
+		String portsConfig = properties.getProperty(ARG_PORT);
+		return new JMXReporter(portsConfig);
+	}
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000..6b9b6bd
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.metrics.jmx.JMXReporterFactory
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java
new file mode 100644
index 0000000..0f96435
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
+
+/**
+ * Tests for the {@link JMXReporterFactory}.
+ */
+public class JMXReporterFactoryTest extends TestLogger {
+
+	@Test
+	public void testPortRangeArgument() {
+		Properties properties = new Properties();
+		properties.setProperty(JMXReporterFactory.ARG_PORT, "9000-9010");
+
+		JMXReporter metricReporter = new JMXReporterFactory()
+			.createMetricReporter(properties);
+		try {
+
+			Assert.assertThat(
+				metricReporter.getPort().get(),
+				allOf(greaterThanOrEqualTo(9000), lessThanOrEqualTo(9010)));
+		} finally {
+			metricReporter.close();
+		}
+	}
+
+	@Test
+	public void testWithoutArgument() {
+		JMXReporter metricReporter = new JMXReporterFactory()
+			.createMetricReporter(new Properties());
+
+		try {
+			Assert.assertFalse(metricReporter.getPort().isPresent());
+		} finally {
+			metricReporter.close();
+		}
+	}
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 23c1d9d..883b918 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.metrics.jmx;
 
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestHistogram;
 import org.apache.flink.metrics.util.TestMeter;
@@ -99,11 +98,8 @@ public class JMXReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testPortConflictHandling() throws Exception {
-		MetricConfig metricConfig = new MetricConfig();
-		metricConfig.setProperty("port", "9020-9035");
-
-		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
-		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());
+		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", new JMXReporter("9020-9035"));
+		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", new JMXReporter("9020-9035"));
 
 		MetricRegistryImpl reg = new MetricRegistryImpl(
 			MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
@@ -156,11 +152,8 @@ public class JMXReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testJMXAvailability() throws Exception {
-		MetricConfig metricConfig = new MetricConfig();
-		metricConfig.setProperty("port", "9040-9055");
-
-		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
-		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());
+		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", new JMXReporter("9040-9055"));
+		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", new JMXReporter("9040-9055"));
 
 		MetricRegistryImpl reg = new MetricRegistryImpl(
 			MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
@@ -195,7 +188,7 @@ public class JMXReporterTest extends TestLogger {
 		ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
 		ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
 
-		JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jmxrmi");
+		JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort().get() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort().get() + "/jmxrmi");
 		JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
 		MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
 
@@ -204,7 +197,7 @@ public class JMXReporterTest extends TestLogger {
 
 		jmxCon1.close();
 
-		JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jmxrmi");
+		JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort().get() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort().get() + "/jmxrmi");
 		JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
 		MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
 
@@ -233,7 +226,7 @@ public class JMXReporterTest extends TestLogger {
 		try {
 			registry = new MetricRegistryImpl(
 				MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
-				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));
+				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter(null))));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -281,7 +274,7 @@ public class JMXReporterTest extends TestLogger {
 		try {
 			registry = new MetricRegistryImpl(
 				MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
-				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));
+				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter(null))));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 


[flink] 02/04: [FLINK-11922][metrics] Support MetricReporter factories

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e63e4b9667e50c267555b64efa7c77db2d577f5b
Author: zentol <ch...@apache.org>
AuthorDate: Fri Mar 15 11:16:01 2019 +0100

    [FLINK-11922][metrics] Support MetricReporter factories
---
 docs/monitoring/metrics.md                         |   6 +-
 .../flink/configuration/ConfigConstants.java       |   3 +
 .../flink/metrics/reporter/MetricReporter.java     |  14 +-
 .../metrics/reporter/MetricReporterFactory.java    |  35 +++++
 .../flink/runtime/metrics/ReporterSetup.java       |  89 ++++++++++--
 .../flink/runtime/metrics/ReporterSetupTest.java   | 156 +++++++++++++++++++++
 ...he.flink.metrics.reporter.MetricReporterFactory |  19 +++
 7 files changed, 302 insertions(+), 20 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 4495d7e..37ce572 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -560,12 +560,14 @@ reporters will be instantiated on each job and task manager when they are starte
 
 - `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
 - `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
+- `metrics.reporter.<name>.factory.class`: The reporter factory class to use for the reporter named `<name>`.
 - `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
 - `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`.
 - `metrics.reporters`: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.
 
-All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
-we will list more settings specific to each reporter.
+All reporters must at least have either the `class` or `factory.class` property. Which property may/should be used depends on the reporter implementation. See the individual reporter configuration sections for more information.
+Some reporters (referred to as `Scheduled`) allow specifying a reporting `interval`.
+Below more settings specific to each reporter will be listed.
 
 Example reporter configuration that specifies multiple reporters:
 
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f1c6b85..9dce4ba 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1241,6 +1241,9 @@ public final class ConfigConstants {
 	/** The class of the reporter to use. This is used as a suffix in an actual reporter config */
 	public static final String METRICS_REPORTER_CLASS_SUFFIX = "class";
 
+	/** The class of the reporter factory to use. This is used as a suffix in an actual reporter config */
+	public static final String METRICS_REPORTER_FACTORY_CLASS_SUFFIX = "factory.class";
+
 	/** The interval between reports. This is used as a suffix in an actual reporter config */
 	public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval";
 
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
index 5c8085f..3ca6e6a 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -25,8 +25,11 @@ import org.apache.flink.metrics.MetricGroup;
 /**
  * Reporters are used to export {@link Metric Metrics} to an external backend.
  *
- * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
- * public no-argument constructor.
+ * <p>Reporters are instantiated either
+ * a) via reflection, in which case they must be public, non-abstract, and have a public no-argument constructor.
+ * b) via a {@link MetricReporterFactory}, in which case no restrictions apply. (recommended)
+ *
+ * <p>Reporters are neither required nor encouraged to support both instantiation paths.
  */
 public interface MetricReporter {
 
@@ -35,8 +38,11 @@ public interface MetricReporter {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Configures this reporter. Since reporters are instantiated generically and hence parameter-less,
-	 * this method is the place where the reporters set their basic fields based on configuration values.
+	 * Configures this reporter.
+	 *
+	 * <p>If the reporter was instantiated generically and hence parameter-less,
+	 * this method is the place where the reporter sets it's basic fields based on configuration values.
+	 * Otherwise, this method will typically be a no-op since resources can be acquired in the constructor.
 	 *
 	 * <p>This method is always called first on a newly instantiated reporter.
 	 *
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java
new file mode 100644
index 0000000..125d01f
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporter} factory.
+ */
+public interface MetricReporterFactory {
+
+	/**
+	 * Creates a new metric reporter.
+	 *
+	 * @param properties configured properties for the reporter
+	 * @return created metric reporter
+	 */
+	MetricReporter createMetricReporter(final Properties properties);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index b6d3827..2d1624f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -25,14 +25,20 @@ import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
@@ -54,7 +60,7 @@ public final class ReporterSetup {
 		Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
 			// [\S&&[^.]] = intersection of non-whitespace and non-period character classes
 			"([\\S&&[^.]]*)\\." +
-			Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX));
+			'(' + Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX) + '|' + Pattern.quote(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX) + ')');
 
 	private final String name;
 	private final MetricConfig configuration;
@@ -142,28 +148,23 @@ public final class ReporterSetup {
 				configuration,
 				ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');
 
-			reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
+			reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration));
 		}
 
+		final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
 		List<ReporterSetup> reporterArguments = new ArrayList<>(reporterConfigurations.size());
 		for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
 			String reporterName = reporterConfiguration.f0;
 			Configuration reporterConfig = reporterConfiguration.f1;
 
 			try {
-				final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-				if (reporterClassName == null) {
-					LOG.error("No reporter class set for reporter " + reporterName + ". Metrics might not be exposed/reported.");
-					continue;
-				}
-
-				Class<?> reporterClass = Class.forName(reporterClassName);
-				MetricReporter reporter = (MetricReporter) reporterClass.newInstance();
-
-				MetricConfig metricConfig = new MetricConfig();
-				reporterConfig.addAllToProperties(metricConfig);
+				Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
+				metricReporterOptional.ifPresent(reporter -> {
+					MetricConfig metricConfig = new MetricConfig();
+					reporterConfig.addAllToProperties(metricConfig);
 
-				reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
+					reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
+				});
 			}
 			catch (Throwable t) {
 				LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
@@ -171,4 +172,64 @@ public final class ReporterSetup {
 		}
 		return reporterArguments;
 	}
+
+	private static Map<String, MetricReporterFactory> loadReporterFactories() {
+		final ServiceLoader<MetricReporterFactory> serviceLoader = ServiceLoader.load(MetricReporterFactory.class);
+
+		final Map<String, MetricReporterFactory> reporterFactories = new HashMap<>(2);
+		final Iterator<MetricReporterFactory> factoryIterator = serviceLoader.iterator();
+		// do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors
+		// such an error might be caused if the META-INF/services contains an entry to a non-existing factory class
+		while (factoryIterator.hasNext()) {
+			try {
+				MetricReporterFactory factory = factoryIterator.next();
+				reporterFactories.put(factory.getClass().getName(), factory);
+			} catch (Exception | ServiceConfigurationError e) {
+				LOG.warn("Error while loading reporter factory.", e);
+			}
+		}
+
+		return Collections.unmodifiableMap(reporterFactories);
+	}
+
+	private static Optional<MetricReporter> loadReporter(
+			final String reporterName,
+			final Configuration reporterConfig,
+			final Map<String, MetricReporterFactory> reporterFactories)
+			throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+
+		final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+		final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);
+
+		if (factoryClassName != null) {
+			return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
+		}
+
+		if (reporterClassName != null) {
+			final Class<?> reporterClass = Class.forName(reporterClassName);
+			return Optional.of((MetricReporter) reporterClass.newInstance());
+		}
+
+		LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
+		return Optional.empty();
+	}
+
+	private static Optional<MetricReporter> loadViaFactory(
+			final String factoryClassName,
+			final String reporterName,
+			final Configuration reporterConfig,
+			final Map<String, MetricReporterFactory> reporterFactories) {
+
+		MetricReporterFactory factory = reporterFactories.get(factoryClassName);
+
+		if (factory == null) {
+			LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: ", factoryClassName, reporterName, reporterFactories.keySet());
+			return Optional.empty();
+		} else {
+			final MetricConfig metricConfig = new MetricConfig();
+			reporterConfig.addAllToProperties(metricConfig);
+
+			return Optional.of(factory.createMetricReporter(metricConfig));
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index eedaa60..f3e86cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.apache.flink.util.TestLogger;
 
@@ -31,9 +32,11 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link ReporterSetup}.
@@ -214,4 +217,157 @@ public class ReporterSetupTest extends TestLogger {
 		Assert.assertEquals("value3", setup.getConfiguration().getString("arg3", null));
 		Assert.assertEquals(TestReporter2.class.getName(), setup.getConfiguration().getString("class", null));
 	}
+
+	/**
+	 * Verifies that a factory configuration is correctly parsed.
+	 */
+	@Test
+	public void testFactoryParsing() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(1, reporterSetups.size());
+
+		final ReporterSetup reporterSetup = reporterSetups.get(0);
+
+		assertEquals(TestReporterFactory.REPORTER, reporterSetup.getReporter());
+	}
+
+	/**
+	 * Verifies that the factory approach is prioritized if both the factory and reflection approach are configured.
+	 */
+	@Test
+	public void testFactoryPrioritization() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(1, reporterSetups.size());
+
+		final ReporterSetup reporterSetup = reporterSetups.get(0);
+		final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
+
+		assertTrue(metricReporter.createdByFactory);
+	}
+
+	/**
+	 * Verifies that an error thrown by a factory does not affect the setup of other reporters.
+	 */
+	@Test
+	public void testFactoryFailureIsolation() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "fail." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, FailingFactory.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(1, reporterSetups.size());
+	}
+
+	/**
+	 * Verifies that factory/reflection approaches can be mixed freely.
+	 */
+	@Test
+	public void testMixedSetupsFactoryParsing() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
+
+		assertEquals(2, reporterSetups.size());
+
+		final ReporterSetup reporterSetup1 = reporterSetups.get(0);
+		final ReporterSetup reporterSetup2 = reporterSetups.get(1);
+
+		final InstantiationTypeTrackingTestReporter metricReporter1 = (InstantiationTypeTrackingTestReporter) reporterSetup1.getReporter();
+		final InstantiationTypeTrackingTestReporter metricReporter2 = (InstantiationTypeTrackingTestReporter) reporterSetup2.getReporter();
+
+		assertTrue(metricReporter1.createdByFactory ^ metricReporter2.createdByFactory);
+	}
+
+	@Test
+	public void testFactoryArgumentForwarding() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, ConfigExposingReporterFactory.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg", "hello");
+
+		ReporterSetup.fromConfiguration(config);
+
+		Properties passedConfig = ConfigExposingReporterFactory.lastConfig;
+		assertEquals("hello", passedConfig.getProperty("arg"));
+	}
+
+	/**
+	 * Factory that exposed the last provided metric config.
+	 */
+	public static class ConfigExposingReporterFactory implements MetricReporterFactory {
+
+		static Properties lastConfig = null;
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			lastConfig = config;
+			return new TestReporter();
+		}
+	}
+
+	/**
+	 * Factory that returns a static reporter.
+	 */
+	public static class TestReporterFactory implements MetricReporterFactory {
+
+		static final MetricReporter REPORTER = new TestReporter();
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			return REPORTER;
+		}
+	}
+
+	/**
+	 * Factory that always throws an error.
+	 */
+	public static class FailingFactory implements MetricReporterFactory {
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			throw new RuntimeException();
+		}
+	}
+
+	/**
+	 * Factory for {@link InstantiationTypeTrackingTestReporter}.
+	 */
+	public static class InstantiationTypeTrackingTestReporterFactory implements MetricReporterFactory {
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			return new InstantiationTypeTrackingTestReporter(true);
+		}
+	}
+
+	/**
+	 * Reporter that exposes which constructor was called.
+	 */
+	protected static class InstantiationTypeTrackingTestReporter extends TestReporter {
+
+		private final boolean createdByFactory;
+
+		public InstantiationTypeTrackingTestReporter() {
+			this(false);
+		}
+
+		InstantiationTypeTrackingTestReporter(boolean createdByFactory) {
+			this.createdByFactory = createdByFactory;
+		}
+
+		public boolean isCreatedByFactory() {
+			return createdByFactory;
+		}
+	}
 }
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000..61b00fd
--- /dev/null
+++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory
+org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory


[flink] 01/04: [hotfix][metrics] Exit early if not reporters are configured

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7534b6c4bdfc3d14a6a50ddc96bc3f646a11bff1
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Apr 18 14:43:56 2019 +0200

    [hotfix][metrics] Exit early if not reporters are configured
---
 .../apache/flink/runtime/metrics/ReporterSetup.java  | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index 1d0c537..b6d3827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -131,20 +131,18 @@ public final class ReporterSetup {
 			}
 		}
 
-		List<Tuple2<String, Configuration>> reporterConfigurations;
-
 		if (namedReporters.isEmpty()) {
-			reporterConfigurations = Collections.emptyList();
-		} else {
-			reporterConfigurations = new ArrayList<>(namedReporters.size());
+			return Collections.emptyList();
+		}
 
-			for (String namedReporter: namedReporters) {
-				DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
-					configuration,
-					ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');
+		List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size());
 
-				reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
-			}
+		for (String namedReporter: namedReporters) {
+			DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
+				configuration,
+				ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');
+
+			reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
 		}
 
 		List<ReporterSetup> reporterArguments = new ArrayList<>(reporterConfigurations.size());