You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 06:34:07 UTC
[flink] 04/04: [FLINK-11922][metrics] Add JMXReporterFactory
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c3c74e005ff3a6e10216742bd58a5145c867f665
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 3 13:14:39 2019 +0200
[FLINK-11922][metrics] Add JMXReporterFactory
---
docs/monitoring/metrics.md | 4 +-
docs/monitoring/metrics.zh.md | 4 +-
.../org/apache/flink/metrics/jmx/JMXReporter.java | 49 +++++++++--------
.../flink/metrics/jmx/JMXReporterFactory.java | 36 ++++++++++++
...he.flink.metrics.reporter.MetricReporterFactory | 16 ++++++
.../flink/metrics/jmx/JMXReporterFactoryTest.java | 64 ++++++++++++++++++++++
.../apache/flink/metrics/jmx/JMXReporterTest.java | 23 +++-----
7 files changed, 155 insertions(+), 41 deletions(-)
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 37ce572..3adbf7f 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -574,7 +574,7 @@ Example reporter configuration that specifies multiple reporters:
{% highlight yaml %}
metrics.reporters: my_jmx_reporter,my_other_reporter
-metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
@@ -607,7 +607,7 @@ Example configuration:
{% highlight yaml %}
-metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789
{% endhighlight %}
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index e9b9424..1f14347 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -572,7 +572,7 @@ Example reporter configuration that specifies multiple reporters:
{% highlight yaml %}
metrics.reporters: my_jmx_reporter,my_other_reporter
-metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
@@ -605,7 +605,7 @@ Example configuration:
{% highlight yaml %}
-metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789
{% endhighlight %}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 461f1dc..217783d 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
@@ -34,6 +35,7 @@ import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
@@ -55,6 +57,7 @@ import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -62,12 +65,11 @@ import java.util.Map;
* <p>Largely based on the JmxReporter class of the dropwizard metrics library
* https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
*/
+@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.jmx.JMXReporterFactory")
public class JMXReporter implements MetricReporter {
static final String JMX_DOMAIN_PREFIX = "org.apache.flink.";
- public static final String ARG_PORT = "port";
-
private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class);
private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
@@ -86,33 +88,24 @@ public class JMXReporter implements MetricReporter {
private final Map<Metric, ObjectName> registeredMetrics;
/** The server to which JMX clients connect to. Allows for better control over port usage. */
- private JMXServer jmxServer;
+ @Nullable
+ private final JMXServer jmxServer;
- public JMXReporter() {
+ JMXReporter(@Nullable final String portsConfig) {
this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
this.registeredMetrics = new HashMap<>();
- }
-
- // ------------------------------------------------------------------------
- // life cycle
- // ------------------------------------------------------------------------
-
- @Override
- public void open(MetricConfig config) {
- String portsConfig = config.getString(ARG_PORT, null);
if (portsConfig != null) {
Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
- JMXServer server = new JMXServer();
- while (ports.hasNext()) {
+ JMXServer successfullyStartedServer = null;
+ while (ports.hasNext() && successfullyStartedServer == null) {
+ JMXServer server = new JMXServer();
int port = ports.next();
try {
server.start(port);
LOG.info("Started JMX server on port " + port + ".");
- // only set our field if the server was actually started
- jmxServer = server;
- break;
+ successfullyStartedServer = server;
} catch (IOException ioe) { //assume port conflict
LOG.debug("Could not start JMX server on port " + port + ".", ioe);
try {
@@ -122,13 +115,24 @@ public class JMXReporter implements MetricReporter {
}
}
}
- if (jmxServer == null) {
+ if (successfullyStartedServer == null) {
throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig);
}
+ this.jmxServer = successfullyStartedServer;
+ } else {
+ this.jmxServer = null;
}
LOG.info("Configured JMXReporter with {port:{}}", portsConfig);
}
+ // ------------------------------------------------------------------------
+ // life cycle
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void open(MetricConfig config) {
+ }
+
@Override
public void close() {
if (jmxServer != null) {
@@ -140,11 +144,12 @@ public class JMXReporter implements MetricReporter {
}
}
- public int getPort() {
+ public Optional<Integer> getPort() {
if (jmxServer == null) {
- throw new NullPointerException("No server was opened. Did you specify a port?");
+ return Optional.empty();
+ } else {
+ return Optional.of(jmxServer.port);
}
- return jmxServer.port;
}
// ------------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
new file mode 100644
index 0000000..128fb6b
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporterFactory} for {@link JMXReporter}.
+ */
+public class JMXReporterFactory implements MetricReporterFactory {
+
+ static final String ARG_PORT = "port";
+
+ @Override
+ public JMXReporter createMetricReporter(Properties properties) {
+ String portsConfig = properties.getProperty(ARG_PORT);
+ return new JMXReporter(portsConfig);
+ }
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000..6b9b6bd
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/main/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.metrics.jmx.JMXReporterFactory
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java
new file mode 100644
index 0000000..0f96435
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
+
+/**
+ * Tests for the {@link JMXReporterFactory}.
+ */
+public class JMXReporterFactoryTest extends TestLogger {
+
+ @Test
+ public void testPortRangeArgument() {
+ Properties properties = new Properties();
+ properties.setProperty(JMXReporterFactory.ARG_PORT, "9000-9010");
+
+ JMXReporter metricReporter = new JMXReporterFactory()
+ .createMetricReporter(properties);
+ try {
+
+ Assert.assertThat(
+ metricReporter.getPort().get(),
+ allOf(greaterThanOrEqualTo(9000), lessThanOrEqualTo(9010)));
+ } finally {
+ metricReporter.close();
+ }
+ }
+
+ @Test
+ public void testWithoutArgument() {
+ JMXReporter metricReporter = new JMXReporterFactory()
+ .createMetricReporter(new Properties());
+
+ try {
+ Assert.assertFalse(metricReporter.getPort().isPresent());
+ } finally {
+ metricReporter.close();
+ }
+ }
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 23c1d9d..883b918 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.metrics.jmx;
import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.metrics.util.TestMeter;
@@ -99,11 +98,8 @@ public class JMXReporterTest extends TestLogger {
*/
@Test
public void testPortConflictHandling() throws Exception {
- MetricConfig metricConfig = new MetricConfig();
- metricConfig.setProperty("port", "9020-9035");
-
- ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
- ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());
+ ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", new JMXReporter("9020-9035"));
+ ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", new JMXReporter("9020-9035"));
MetricRegistryImpl reg = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
@@ -156,11 +152,8 @@ public class JMXReporterTest extends TestLogger {
*/
@Test
public void testJMXAvailability() throws Exception {
- MetricConfig metricConfig = new MetricConfig();
- metricConfig.setProperty("port", "9040-9055");
-
- ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
- ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());
+ ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", new JMXReporter("9040-9055"));
+ ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", new JMXReporter("9040-9055"));
MetricRegistryImpl reg = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
@@ -195,7 +188,7 @@ public class JMXReporterTest extends TestLogger {
ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
- JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jmxrmi");
+ JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort().get() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort().get() + "/jmxrmi");
JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
@@ -204,7 +197,7 @@ public class JMXReporterTest extends TestLogger {
jmxCon1.close();
- JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jmxrmi");
+ JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort().get() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort().get() + "/jmxrmi");
JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
@@ -233,7 +226,7 @@ public class JMXReporterTest extends TestLogger {
try {
registry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
- Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));
+ Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter(null))));
TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
@@ -281,7 +274,7 @@ public class JMXReporterTest extends TestLogger {
try {
registry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
- Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));
+ Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter(null))));
TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");