You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:29:07 UTC
[flink] branch master updated: [FLINK-5552][runtime][jmx] Introduce
JMXServer singleton
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2b699d6 [FLINK-5552][runtime][jmx] Introduce JMXServer singleton
2b699d6 is described below
commit 2b699d6cb20823cd3a437b49bc96f015f4c1b72e
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Aug 3 13:28:38 2020 -0700
[FLINK-5552][runtime][jmx] Introduce JMXServer singleton
---
.../expert_debugging_and_tuning_section.html | 18 +++
docs/ops/config.md | 4 +
docs/ops/config.zh.md | 4 +
.../flink/annotation/docs/Documentation.java | 1 +
.../flink/configuration/JMXServerOptions.java | 51 ++++++
.../org/apache/flink/metrics/jmx/JMXReporter.java | 179 +--------------------
.../apache/flink/metrics/jmx/JMXReporterTest.java | 3 +
.../runtime/entrypoint/ClusterEntrypoint.java | 10 ++
.../apache/flink/runtime/management/JMXServer.java | 164 +++++++++++++++++++
.../flink/runtime/management/JMXService.java | 103 ++++++++++++
.../runtime/taskexecutor/TaskManagerRunner.java | 10 ++
.../flink/runtime/management/JMXServerTest.java | 103 ++++++++++++
.../flink/runtime/management/JMXServiceTest.java | 56 +++++++
13 files changed, 534 insertions(+), 172 deletions(-)
diff --git a/docs/_includes/generated/expert_debugging_and_tuning_section.html b/docs/_includes/generated/expert_debugging_and_tuning_section.html
new file mode 100644
index 0000000..09f39595
--- /dev/null
+++ b/docs/_includes/generated/expert_debugging_and_tuning_section.html
@@ -0,0 +1,18 @@
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>jmx.server.port</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The port range for the JMX server to start the registry. The port config can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234". <br />This option overrides metrics.reporter.*.port option.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 6be396b..d94e7db 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -297,6 +297,10 @@ Please refer to the [Debugging Classloading Docs]({{site.baseurl}}/monitoring/de
{% include generated/expert_class_loading_section.html %}
+### Advanced Options for the debugging
+
+{% include generated/expert_debugging_and_tuning_section.html %}
+
### Advanced State Backends Options
{% include generated/expert_state_backends_section.html %}
diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md
index 1cfc84b..9bd596a 100644
--- a/docs/ops/config.zh.md
+++ b/docs/ops/config.zh.md
@@ -297,6 +297,10 @@ Please refer to the [Debugging Classloading Docs]({{site.baseurl}}/monitoring/de
{% include generated/expert_class_loading_section.html %}
+### Advanced Options for the debugging
+
+{% include generated/expert_debugging_and_tuning_section.html %}
+
### Advanced State Backends Options
{% include generated/expert_state_backends_section.html %}
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 98eb312..8316050 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -82,6 +82,7 @@ public final class Documentation {
public static final String STATE_BACKEND_ROCKSDB = "state_backend_rocksdb";
public static final String EXPERT_CLASS_LOADING = "expert_class_loading";
+ public static final String EXPERT_DEBUGGING_AND_TUNING = "expert_debugging_and_tuning";
public static final String EXPERT_SCHEDULING = "expert_scheduling";
public static final String EXPERT_FAULT_TOLERANCE = "expert_fault_tolerance";
public static final String EXPERT_STATE_BACKENDS = "expert_state_backends";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JMXServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JMXServerOptions.java
new file mode 100644
index 0000000..5cc26f0
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JMXServerOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to JMX server.
+ */
+@PublicEvolving
+public class JMXServerOptions {
+
+ /** Port configured to enable JMX server for metrics and debugging. */
+ @Documentation.Section(Documentation.Sections.EXPERT_DEBUGGING_AND_TUNING)
+ public static final ConfigOption<String>JMX_SERVER_PORT =
+ key("jmx.server.port")
+ .noDefaultValue()
+ .withDescription(
+ new Description.DescriptionBuilder()
+ .text("The port range for the JMX server to start the registry. The " +
+ "port config can be a single port: \"9123\", a range of ports: \"50100-50200\", " +
+ "or a list of ranges and ports: \"50100-50200,50300-50400,51234\". ")
+ .linebreak()
+ .text("This option overrides metrics.reporter.*.port option.")
+ .build());
+
+ // ------------------------------------------------------------------------
+
+ /** Not intended to be instantiated. */
+ private JMXServerOptions() {}
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 3fe5dc1..60e0f90 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -18,6 +18,7 @@
package org.apache.flink.metrics.jmx;
+import org.apache.flink.configuration.JMXServerOptions;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -28,9 +29,9 @@ import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.management.JMXService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
-import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,26 +43,12 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXServiceURL;
-import javax.management.remote.rmi.RMIConnectorServer;
-import javax.management.remote.rmi.RMIJRMPServerImpl;
-import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.net.MalformedURLException;
-import java.rmi.NoSuchObjectException;
-import java.rmi.NotBoundException;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Hashtable;
-import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -91,42 +78,15 @@ public class JMXReporter implements MetricReporter {
/** The names under which the registered metrics have been added to the MBeanServer. */
private final Map<Metric, ObjectName> registeredMetrics;
- /** The server to which JMX clients connect to. Allows for better control over port usage. */
- @Nullable
- private final JMXServer jmxServer;
-
JMXReporter(@Nullable final String portsConfig) {
this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
this.registeredMetrics = new HashMap<>();
if (portsConfig != null) {
- Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
-
- JMXServer successfullyStartedServer = null;
- while (ports.hasNext() && successfullyStartedServer == null) {
- JMXServer server = new JMXServer();
- int port = ports.next();
- try {
- server.start(port);
- LOG.info("Started JMX server on port " + port + ".");
- successfullyStartedServer = server;
- } catch (IOException ioe) { //assume port conflict
- LOG.debug("Could not start JMX server on port " + port + ".", ioe);
- try {
- server.stop();
- } catch (Exception e) {
- LOG.debug("Could not stop JMX server.", e);
- }
- }
- }
- if (successfullyStartedServer == null) {
- throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig);
- }
- this.jmxServer = successfullyStartedServer;
- } else {
- this.jmxServer = null;
+ LOG.warn("JMXReporter port config is deprecated. " +
+ "Please use: {} instead!", JMXServerOptions.JMX_SERVER_PORT);
+ JMXService.startInstance(portsConfig);
}
- LOG.info("Configured JMXReporter with {port:{}}", portsConfig);
}
// ------------------------------------------------------------------------
@@ -139,21 +99,11 @@ public class JMXReporter implements MetricReporter {
@Override
public void close() {
- if (jmxServer != null) {
- try {
- jmxServer.stop();
- } catch (IOException e) {
- LOG.error("Failed to stop JMX server.", e);
- }
- }
+ // Nothing to close.
}
public Optional<Integer> getPort() {
- if (jmxServer == null) {
- return Optional.empty();
- } else {
- return Optional.of(jmxServer.port);
- }
+ return JMXService.getPort();
}
// ------------------------------------------------------------------------
@@ -474,119 +424,4 @@ public class JMXReporter implements MetricReporter {
return meter.getCount();
}
}
-
- /**
- * JMX Server implementation that JMX clients can connect to.
- *
- * <p>Originally based on j256 simplejmx project
- *
- * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
- */
- private static class JMXServer {
- private Registry rmiRegistry;
- private JMXConnectorServer connector;
- private int port;
- private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
-
- public void start(int port) throws IOException {
- if (rmiRegistry != null && connector != null) {
- LOG.debug("JMXServer is already running.");
- return;
- }
- internalStart(port);
- this.port = port;
- }
-
- private void internalStart(int port) throws IOException {
- rmiServerReference.set(null);
-
- // this allows clients to lookup the JMX service
- rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
-
- String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
- JMXServiceURL url;
- try {
- url = new JMXServiceURL(serviceUrl);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
- }
-
- final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
-
- connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
- connector.start();
-
- // we can't pass the created stub directly to the registry since this would form a cyclic dependency:
- // - you can only start the connector after the registry was started
- // - you can only create the stub after the connector was started
- // - you can only start the registry after the stub was created
- rmiServerReference.set(rmiServer.toStub());
- }
-
- public void stop() throws IOException {
- rmiServerReference.set(null);
- if (connector != null) {
- try {
- connector.stop();
- } finally {
- connector = null;
- }
- }
- if (rmiRegistry != null) {
- try {
- UnicastRemoteObject.unexportObject(rmiRegistry, true);
- } catch (NoSuchObjectException e) {
- throw new IOException("Could not un-export our RMI registry", e);
- } finally {
- rmiRegistry = null;
- }
- }
- }
-
- /**
- * A registry that only exposes a single remote object.
- */
- @SuppressWarnings("restriction")
- private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
- private final String lookupName;
- private final AtomicReference<Remote> remoteServerStub;
-
- JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
- super(port);
- this.lookupName = lookupName;
- this.remoteServerStub = remoteServerStub;
- }
-
- @Override
- public Remote lookup(String s) throws NotBoundException {
- if (lookupName.equals(s)) {
- final Remote remote = remoteServerStub.get();
- if (remote != null) {
- return remote;
- }
- }
- throw new NotBoundException("Not bound.");
- }
-
- @Override
- public void bind(String s, Remote remote) {
- // this is called from RMIConnectorServer#start; don't throw a general AccessException
- }
-
- @Override
- public void unbind(String s) {
- // this is called from RMIConnectorServer#stop; don't throw a general AccessException
- }
-
- @Override
- public void rebind(String s, Remote remote) {
- // might as well not throw an exception here given that the others don't
- }
-
- @Override
- public String[] list() {
- return new String[]{lookupName};
- }
- }
- }
}
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index af02c6f..f589277 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -206,6 +206,9 @@ public class JMXReporterTest extends TestLogger {
assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
+ // JMX Server URL should be identical since we made it static.
+ assertEquals(url1, url2);
+
rep1.notifyOfRemovedMetric(g1, "rep1", null);
rep1.notifyOfRemovedMetric(g2, "rep2", null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 95c83c7..9b5b19b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JMXServerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.management.JMXService;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
@@ -256,6 +258,8 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
configuration.getString(JobManagerOptions.BIND_HOST),
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
+ JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
+
// update the configuration used to create the high availability services
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
@@ -376,6 +380,12 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
terminationFutures.add(commonRpcService.stopService());
}
+ try {
+ JMXService.stopInstance();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
if (exception != null) {
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXServer.java
new file mode 100644
index 0000000..f590954
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXServer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.NotBoundException;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * JMX Server implementation that JMX clients can connect to.
+ *
+ * <p>Heavily based on j256 simplejmx project
+ *
+ * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+ */
+class JMXServer {
+ private static final Logger LOG = LoggerFactory.getLogger(JMXServer.class);
+
+ private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
+
+ private Registry rmiRegistry;
+ private JMXConnectorServer connector;
+ private int port;
+
+ JMXServer() {
+ }
+
+ void start(int port) throws IOException {
+ if (rmiRegistry != null && connector != null) {
+ LOG.debug("JMXServer is already running.");
+ return;
+ }
+ internalStart(port);
+ this.port = port;
+ }
+
+ void stop() throws IOException {
+ rmiServerReference.set(null);
+ if (connector != null) {
+ try {
+ connector.stop();
+ } finally {
+ connector = null;
+ }
+ }
+ if (rmiRegistry != null) {
+ try {
+ UnicastRemoteObject.unexportObject(rmiRegistry, true);
+ } catch (NoSuchObjectException e) {
+ throw new IOException("Could not un-export our RMI registry", e);
+ } finally {
+ rmiRegistry = null;
+ }
+ }
+ }
+
+ int getPort() {
+ return port;
+ }
+
+ private void internalStart(int port) throws IOException {
+ rmiServerReference.set(null);
+
+ // this allows clients to lookup the JMX service
+ rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
+
+ String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
+ JMXServiceURL url;
+ try {
+ url = new JMXServiceURL(serviceUrl);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
+ }
+
+ final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
+
+ connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
+ connector.start();
+
+ // we can't pass the created stub directly to the registry since this would form a cyclic dependency:
+ // - you can only start the connector after the registry was started
+ // - you can only create the stub after the connector was started
+ // - you can only start the registry after the stub was created
+ rmiServerReference.set(rmiServer.toStub());
+ }
+
+ /**
+ * A registry that only exposes a single remote object.
+ */
+ @SuppressWarnings("restriction")
+ private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+ private final String lookupName;
+ private final AtomicReference<Remote> remoteServerStub;
+
+ JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
+ super(port);
+ this.lookupName = lookupName;
+ this.remoteServerStub = remoteServerStub;
+ }
+
+ @Override
+ public Remote lookup(String s) throws NotBoundException {
+ if (lookupName.equals(s)) {
+ final Remote remote = remoteServerStub.get();
+ if (remote != null) {
+ return remote;
+ }
+ }
+ throw new NotBoundException("Not bound.");
+ }
+
+ @Override
+ public void bind(String s, Remote remote) {
+ // this is called from RMIConnectorServer#start; don't throw a general AccessException
+ }
+
+ @Override
+ public void unbind(String s) {
+ // this is called from RMIConnectorServer#stop; don't throw a general AccessException
+ }
+
+ @Override
+ public void rebind(String s, Remote remote) {
+ // might as well not throw an exception here given that the others don't
+ }
+
+ @Override
+ public String[] list() {
+ return new String[]{lookupName};
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
new file mode 100644
index 0000000..816e856
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+
+/**
+ * Provide a JVM-wide singleton JMX Service.
+ */
+public class JMXService {
+ private static final Logger LOG = LoggerFactory.getLogger(JMXService.class);
+ private static JMXServer jmxServer = null;
+
+ /**
+ * Acquire the global singleton JMXServer instance.
+ */
+ public static Optional<JMXServer> getInstance() {
+ return Optional.ofNullable(jmxServer);
+ }
+
+ /**
+ * Start the JMV-wide singleton JMX server.
+ *
+ * <p>If JMXServer static instance is already started, it will not be
+ * started again. Instead a warning will be logged indicating which port
+ * the existing JMXServer static instance is exposing.
+ *
+ * @param portsConfig port configuration of the JMX server.
+ */
+ public static synchronized void startInstance(String portsConfig) {
+ if (jmxServer == null) {
+ if (portsConfig != null) {
+ Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
+ if (ports.hasNext()) {
+ jmxServer = startJMXServerWithPortRanges(ports);
+ }
+ if (jmxServer == null) {
+ LOG.error("Could not start JMX server on any configured port(s) in: " + portsConfig);
+ }
+ }
+ } else {
+ LOG.warn("JVM-wide JMXServer already started at port: " + jmxServer.getPort());
+ }
+ }
+
+ /**
+ * Stop the JMX server.
+ */
+ public static synchronized void stopInstance() throws IOException {
+ if (jmxServer != null) {
+ jmxServer.stop();
+ jmxServer = null;
+ }
+ }
+
+ public static Optional<Integer> getPort() {
+ return Optional.ofNullable(jmxServer).map(JMXServer::getPort);
+ }
+
+ private static JMXServer startJMXServerWithPortRanges(Iterator<Integer> ports) {
+ JMXServer successfullyStartedServer = null;
+ while (ports.hasNext() && successfullyStartedServer == null) {
+ JMXServer server = new JMXServer();
+ int port = ports.next();
+ try {
+ server.start(port);
+ LOG.info("Started JMX server on port " + port + ".");
+ successfullyStartedServer = server;
+ } catch (IOException ioe) { //assume port conflict
+ LOG.debug("Could not start JMX server on port " + port + ".", ioe);
+ try {
+ server.stop();
+ } catch (Exception e) {
+ LOG.debug("Could not stop JMX server.", e);
+ }
+ }
+ }
+ return successfullyStartedServer;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index e44e5f0..52c17f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JMXServerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.management.JMXService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -137,6 +139,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+ JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
+
rpcService = createRpcService(configuration, highAvailabilityServices);
this.resourceId = new ResourceID(getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort()));
@@ -217,6 +221,12 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
Exception exception = null;
try {
+ JMXService.stopInstance();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ try {
blobCacheService.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServerTest.java
new file mode 100644
index 0000000..2f2528c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link JMXServer} functionality.
+ */
+public class JMXServerTest {
+
+ @Before
+ public void setUp() throws Exception {
+ JMXService.startInstance("23456-23466");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JMXService.stopInstance();
+ }
+
+ /**
+ * Verifies initialize, registered mBean and retrieval via attribute.
+ */
+ @Test
+ public void testJMXServiceRegisterMBean() throws Exception {
+ TestObject testObject = new TestObject();
+ ObjectName testObjectName = new ObjectName("org.apache.flink.management", "key", "value");
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ try {
+ Optional<JMXServer> server = JMXService.getInstance();
+ assertTrue(server.isPresent());
+ mBeanServer.registerMBean(testObject, testObjectName);
+
+ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://localhost:" + server.get().getPort() + "/jndi/rmi://localhost:" + server.get().getPort() + "/jmxrmi");
+ JMXConnector jmxConn = JMXConnectorFactory.connect(url);
+ MBeanServerConnection mbeanConnConn = jmxConn.getMBeanServerConnection();
+
+ assertEquals(1, mbeanConnConn.getAttribute(testObjectName, "Foo"));
+ mBeanServer.unregisterMBean(testObjectName);
+ try {
+ mbeanConnConn.getAttribute(testObjectName, "Foo");
+ } catch (Exception e) {
+ // expected for unregistered objects.
+ assertTrue(e instanceof InstanceNotFoundException);
+ }
+ } finally {
+ JMXService.stopInstance();
+ }
+ }
+
+ /**
+ * Test MBean interface.
+ */
+ public interface TestObjectMBean {
+ int getFoo();
+ }
+
+ /**
+ * Test MBean Object.
+ */
+ public static class TestObject implements TestObjectMBean {
+ private int foo = 1;
+
+ @Override
+ public int getFoo() {
+ return foo;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServiceTest.java
new file mode 100644
index 0000000..780c5cd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServiceTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.junit.Test;
+
+import java.net.ServerSocket;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the singleton usage via {@link JMXService}.
+ */
+public class JMXServiceTest {
+
+ /**
+ * Verifies initialize with port range.
+ */
+ @Test
+ public void testJMXServiceInit() throws Exception {
+ try {
+ JMXService.startInstance("23456-23466");
+ assertTrue(JMXService.getPort().isPresent());
+ } finally {
+ JMXService.stopInstance();
+ }
+ }
+
+ /**
+ * Verifies initialize failure with occupied port.
+ */
+ @Test
+ public void testJMXServiceInitWithOccupiedPort() throws Exception {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ JMXService.startInstance(String.valueOf(socket.getLocalPort()));
+ assertFalse(JMXService.getInstance().isPresent());
+ }
+ }
+}