You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 06:34:07 UTC

[flink] 04/04: [FLINK-11922][metrics] Add JMXReporterFactory

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3c74e005ff3a6e10216742bd58a5145c867f665
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 3 13:14:39 2019 +0200

    [FLINK-11922][metrics] Add JMXReporterFactory
---
 docs/monitoring/metrics.md                         |  4 +-
 docs/monitoring/metrics.zh.md                      |  4 +-
 .../org/apache/flink/metrics/jmx/JMXReporter.java  | 49 +++++++++--------
 .../flink/metrics/jmx/JMXReporterFactory.java      | 36 ++++++++++++
 ...he.flink.metrics.reporter.MetricReporterFactory | 16 ++++++
 .../flink/metrics/jmx/JMXReporterFactoryTest.java  | 64 ++++++++++++++++++++++
 .../apache/flink/metrics/jmx/JMXReporterTest.java  | 23 +++-----
 7 files changed, 155 insertions(+), 41 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 37ce572..3adbf7f 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -574,7 +574,7 @@ Example reporter configuration that specifies multiple reporters:
 {% highlight yaml %}
 metrics.reporters: my_jmx_reporter,my_other_reporter
 
-metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.my_jmx_reporter.port: 9020-9040
 
 metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
@@ -607,7 +607,7 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.jmx.port: 8789
 
 {% endhighlight %}
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index e9b9424..1f14347 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -572,7 +572,7 @@ Example reporter configuration that specifies multiple reporters:
 {% highlight yaml %}
 metrics.reporters: my_jmx_reporter,my_other_reporter
 
-metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.my_jmx_reporter.port: 9020-9040
 
 metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
@@ -605,7 +605,7 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
 metrics.reporter.jmx.port: 8789
 
 {% endhighlight %}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 461f1dc..217783d 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.InstantiateViaFactory;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
@@ -34,6 +35,7 @@ import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServer;
@@ -55,6 +57,7 @@ import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -62,12 +65,11 @@ import java.util.Map;
  * <p>Largely based on the JmxReporter class of the dropwizard metrics library
  * https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
  */
+@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.jmx.JMXReporterFactory")
 public class JMXReporter implements MetricReporter {
 
 	static final String JMX_DOMAIN_PREFIX = "org.apache.flink.";
 
-	public static final String ARG_PORT = "port";
-
 	private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class);
 
 	private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
@@ -86,33 +88,24 @@ public class JMXReporter implements MetricReporter {
 	private final Map<Metric, ObjectName> registeredMetrics;
 
 	/** The server to which JMX clients connect to. Allows for better control over port usage. */
-	private JMXServer jmxServer;
+	@Nullable
+	private final JMXServer jmxServer;
 
-	public JMXReporter() {
+	JMXReporter(@Nullable final String portsConfig) {
 		this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
 		this.registeredMetrics = new HashMap<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  life cycle
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open(MetricConfig config) {
-		String portsConfig = config.getString(ARG_PORT, null);
 
 		if (portsConfig != null) {
 			Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
 
-			JMXServer server = new JMXServer();
-			while (ports.hasNext()) {
+			JMXServer successfullyStartedServer = null;
+			while (ports.hasNext() && successfullyStartedServer == null) {
+				JMXServer server = new JMXServer();
 				int port = ports.next();
 				try {
 					server.start(port);
 					LOG.info("Started JMX server on port " + port + ".");
-					// only set our field if the server was actually started
-					jmxServer = server;
-					break;
+					successfullyStartedServer = server;
 				} catch (IOException ioe) { //assume port conflict
 					LOG.debug("Could not start JMX server on port " + port + ".", ioe);
 					try {
@@ -122,13 +115,24 @@ public class JMXReporter implements MetricReporter {
 					}
 				}
 			}
-			if (jmxServer == null) {
+			if (successfullyStartedServer == null) {
 				throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig);
 			}
+			this.jmxServer = successfullyStartedServer;
+		} else {
+			this.jmxServer = null;
 		}
 		LOG.info("Configured JMXReporter with {port:{}}", portsConfig);
 	}
 
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(MetricConfig config) {
+	}
+
 	@Override
 	public void close() {
 		if (jmxServer != null) {
@@ -140,11 +144,12 @@ public class JMXReporter implements MetricReporter {
 		}
 	}
 
-	public int getPort() {
+	public Optional<Integer> getPort() {
 		if (jmxServer == null) {
-			throw new NullPointerException("No server was opened. Did you specify a port?");
+			return Optional.empty();
+		} else {
+			return Optional.of(jmxServer.port);
 		}
-		return jmxServer.port;
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
new file mode 100644
index 0000000..128fb6b
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporterFactory} for {@link JMXReporter}.
+ */
+public class JMXReporterFactory implements MetricReporterFactory {
+
+	static final String ARG_PORT = "port";
+
+	@Override
+	public JMXReporter createMetricReporter(Properties properties) {
+		String portsConfig = properties.getProperty(ARG_PORT);
+		return new JMXReporter(portsConfig);
+	}
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000..6b9b6bd
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.metrics.jmx.JMXReporterFactory
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java
new file mode 100644
index 0000000..0f96435
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
+
+/**
+ * Tests for the {@link JMXReporterFactory}.
+ */
+public class JMXReporterFactoryTest extends TestLogger {
+
+	@Test
+	public void testPortRangeArgument() {
+		Properties properties = new Properties();
+		properties.setProperty(JMXReporterFactory.ARG_PORT, "9000-9010");
+
+		JMXReporter metricReporter = new JMXReporterFactory()
+			.createMetricReporter(properties);
+		try {
+
+			Assert.assertThat(
+				metricReporter.getPort().get(),
+				allOf(greaterThanOrEqualTo(9000), lessThanOrEqualTo(9010)));
+		} finally {
+			metricReporter.close();
+		}
+	}
+
+	@Test
+	public void testWithoutArgument() {
+		JMXReporter metricReporter = new JMXReporterFactory()
+			.createMetricReporter(new Properties());
+
+		try {
+			Assert.assertFalse(metricReporter.getPort().isPresent());
+		} finally {
+			metricReporter.close();
+		}
+	}
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 23c1d9d..883b918 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.metrics.jmx;
 
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestHistogram;
 import org.apache.flink.metrics.util.TestMeter;
@@ -99,11 +98,8 @@ public class JMXReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testPortConflictHandling() throws Exception {
-		MetricConfig metricConfig = new MetricConfig();
-		metricConfig.setProperty("port", "9020-9035");
-
-		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
-		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());
+		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", new JMXReporter("9020-9035"));
+		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", new JMXReporter("9020-9035"));
 
 		MetricRegistryImpl reg = new MetricRegistryImpl(
 			MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
@@ -156,11 +152,8 @@ public class JMXReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testJMXAvailability() throws Exception {
-		MetricConfig metricConfig = new MetricConfig();
-		metricConfig.setProperty("port", "9040-9055");
-
-		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
-		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());
+		ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", new JMXReporter("9040-9055"));
+		ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", new JMXReporter("9040-9055"));
 
 		MetricRegistryImpl reg = new MetricRegistryImpl(
 			MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
@@ -195,7 +188,7 @@ public class JMXReporterTest extends TestLogger {
 		ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
 		ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
 
-		JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jmxrmi");
+		JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort().get() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort().get() + "/jmxrmi");
 		JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
 		MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
 
@@ -204,7 +197,7 @@ public class JMXReporterTest extends TestLogger {
 
 		jmxCon1.close();
 
-		JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jmxrmi");
+		JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort().get() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort().get() + "/jmxrmi");
 		JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
 		MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
 
@@ -233,7 +226,7 @@ public class JMXReporterTest extends TestLogger {
 		try {
 			registry = new MetricRegistryImpl(
 				MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
-				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));
+				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter(null))));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -281,7 +274,7 @@ public class JMXReporterTest extends TestLogger {
 		try {
 			registry = new MetricRegistryImpl(
 				MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
-				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));
+				Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter(null))));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");