You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:29:07 UTC

[flink] branch master updated: [FLINK-5552][runtime][jmx] Introduce JMXServer singleton

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b699d6  [FLINK-5552][runtime][jmx] Introduce JMXServer singleton
2b699d6 is described below

commit 2b699d6cb20823cd3a437b49bc96f015f4c1b72e
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Aug 3 13:28:38 2020 -0700

    [FLINK-5552][runtime][jmx] Introduce JMXServer singleton
---
 .../expert_debugging_and_tuning_section.html       |  18 +++
 docs/ops/config.md                                 |   4 +
 docs/ops/config.zh.md                              |   4 +
 .../flink/annotation/docs/Documentation.java       |   1 +
 .../flink/configuration/JMXServerOptions.java      |  51 ++++++
 .../org/apache/flink/metrics/jmx/JMXReporter.java  | 179 +--------------------
 .../apache/flink/metrics/jmx/JMXReporterTest.java  |   3 +
 .../runtime/entrypoint/ClusterEntrypoint.java      |  10 ++
 .../apache/flink/runtime/management/JMXServer.java | 164 +++++++++++++++++++
 .../flink/runtime/management/JMXService.java       | 103 ++++++++++++
 .../runtime/taskexecutor/TaskManagerRunner.java    |  10 ++
 .../flink/runtime/management/JMXServerTest.java    | 103 ++++++++++++
 .../flink/runtime/management/JMXServiceTest.java   |  56 +++++++
 13 files changed, 534 insertions(+), 172 deletions(-)

diff --git a/docs/_includes/generated/expert_debugging_and_tuning_section.html b/docs/_includes/generated/expert_debugging_and_tuning_section.html
new file mode 100644
index 0000000..09f39595
--- /dev/null
+++ b/docs/_includes/generated/expert_debugging_and_tuning_section.html
@@ -0,0 +1,18 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>jmx.server.port</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The port range for the JMX server to start the registry. The port config can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234". <br />This option overrides metrics.reporter.*.port option.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 6be396b..d94e7db 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -297,6 +297,10 @@ Please refer to the [Debugging Classloading Docs]({{site.baseurl}}/monitoring/de
 
 {% include generated/expert_class_loading_section.html %}
 
+### Advanced Options for the debugging
+
+{% include generated/expert_debugging_and_tuning_section.html %}
+
 ### Advanced State Backends Options
 
 {% include generated/expert_state_backends_section.html %}
diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md
index 1cfc84b..9bd596a 100644
--- a/docs/ops/config.zh.md
+++ b/docs/ops/config.zh.md
@@ -297,6 +297,10 @@ Please refer to the [Debugging Classloading Docs]({{site.baseurl}}/monitoring/de
 
 {% include generated/expert_class_loading_section.html %}
 
+### Advanced Options for the debugging
+
+{% include generated/expert_debugging_and_tuning_section.html %}
+
 ### Advanced State Backends Options
 
 {% include generated/expert_state_backends_section.html %}
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 98eb312..8316050 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -82,6 +82,7 @@ public final class Documentation {
 		public static final String STATE_BACKEND_ROCKSDB = "state_backend_rocksdb";
 
 		public static final String EXPERT_CLASS_LOADING = "expert_class_loading";
+		public static final String EXPERT_DEBUGGING_AND_TUNING = "expert_debugging_and_tuning";
 		public static final String EXPERT_SCHEDULING = "expert_scheduling";
 		public static final String EXPERT_FAULT_TOLERANCE = "expert_fault_tolerance";
 		public static final String EXPERT_STATE_BACKENDS = "expert_state_backends";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JMXServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JMXServerOptions.java
new file mode 100644
index 0000000..5cc26f0
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JMXServerOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to JMX server.
+ */
+@PublicEvolving
+public class JMXServerOptions {
+
+	/** Port configured to enable JMX server for metrics and debugging. */
+	@Documentation.Section(Documentation.Sections.EXPERT_DEBUGGING_AND_TUNING)
+	public static final ConfigOption<String>JMX_SERVER_PORT =
+		key("jmx.server.port")
+			.noDefaultValue()
+			.withDescription(
+			new Description.DescriptionBuilder()
+				.text("The port range for the JMX server to start the registry. The " +
+					"port config can be a single port: \"9123\", a range of ports: \"50100-50200\", " +
+					"or a list of ranges and ports: \"50100-50200,50300-50400,51234\". ")
+				.linebreak()
+				.text("This option overrides metrics.reporter.*.port option.")
+				.build());
+
+	// ------------------------------------------------------------------------
+
+	/** Not intended to be instantiated. */
+	private JMXServerOptions() {}
+}
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 3fe5dc1..60e0f90 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.metrics.jmx;
 
+import org.apache.flink.configuration.JMXServerOptions;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -28,9 +29,9 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.InstantiateViaFactory;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.management.JMXService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
-import org.apache.flink.util.NetUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,26 +43,12 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXServiceURL;
-import javax.management.remote.rmi.RMIConnectorServer;
-import javax.management.remote.rmi.RMIJRMPServerImpl;
 
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.MalformedURLException;
-import java.rmi.NoSuchObjectException;
-import java.rmi.NotBoundException;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.Hashtable;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -91,42 +78,15 @@ public class JMXReporter implements MetricReporter {
 	/** The names under which the registered metrics have been added to the MBeanServer. */
 	private final Map<Metric, ObjectName> registeredMetrics;
 
-	/** The server to which JMX clients connect to. Allows for better control over port usage. */
-	@Nullable
-	private final JMXServer jmxServer;
-
 	JMXReporter(@Nullable final String portsConfig) {
 		this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
 		this.registeredMetrics = new HashMap<>();
 
 		if (portsConfig != null) {
-			Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
-
-			JMXServer successfullyStartedServer = null;
-			while (ports.hasNext() && successfullyStartedServer == null) {
-				JMXServer server = new JMXServer();
-				int port = ports.next();
-				try {
-					server.start(port);
-					LOG.info("Started JMX server on port " + port + ".");
-					successfullyStartedServer = server;
-				} catch (IOException ioe) { //assume port conflict
-					LOG.debug("Could not start JMX server on port " + port + ".", ioe);
-					try {
-						server.stop();
-					} catch (Exception e) {
-						LOG.debug("Could not stop JMX server.", e);
-					}
-				}
-			}
-			if (successfullyStartedServer == null) {
-				throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig);
-			}
-			this.jmxServer = successfullyStartedServer;
-		} else {
-			this.jmxServer = null;
+			LOG.warn("JMXReporter port config is deprecated. " +
+				"Please use: {} instead!", JMXServerOptions.JMX_SERVER_PORT);
+			JMXService.startInstance(portsConfig);
 		}
-		LOG.info("Configured JMXReporter with {port:{}}", portsConfig);
 	}
 
 	// ------------------------------------------------------------------------
@@ -139,21 +99,11 @@ public class JMXReporter implements MetricReporter {
 
 	@Override
 	public void close() {
-		if (jmxServer != null) {
-			try {
-				jmxServer.stop();
-			} catch (IOException e) {
-				LOG.error("Failed to stop JMX server.", e);
-			}
-		}
+		// Nothing to close.
 	}
 
 	public Optional<Integer> getPort() {
-		if (jmxServer == null) {
-			return Optional.empty();
-		} else {
-			return Optional.of(jmxServer.port);
-		}
+		return JMXService.getPort();
 	}
 
 	// ------------------------------------------------------------------------
@@ -474,119 +424,4 @@ public class JMXReporter implements MetricReporter {
 			return meter.getCount();
 		}
 	}
-
-	/**
-	 * JMX Server implementation that JMX clients can connect to.
-	 *
-	 * <p>Originally based on j256 simplejmx project
-	 *
-	 * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
-	 */
-	private static class JMXServer {
-		private Registry rmiRegistry;
-		private JMXConnectorServer connector;
-		private int port;
-		private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
-
-		public void start(int port) throws IOException {
-			if (rmiRegistry != null && connector != null) {
-				LOG.debug("JMXServer is already running.");
-				return;
-			}
-			internalStart(port);
-			this.port = port;
-		}
-
-		private void internalStart(int port) throws IOException {
-			rmiServerReference.set(null);
-
-			// this allows clients to lookup the JMX service
-			rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
-
-			String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
-			JMXServiceURL url;
-			try {
-				url = new JMXServiceURL(serviceUrl);
-			} catch (MalformedURLException e) {
-				throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
-			}
-
-			final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
-
-			connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
-			connector.start();
-
-			// we can't pass the created stub directly to the registry since this would form a cyclic dependency:
-			// - you can only start the connector after the registry was started
-			// - you can only create the stub after the connector was started
-			// - you can only start the registry after the stub was created
-			rmiServerReference.set(rmiServer.toStub());
-		}
-
-		public void stop() throws IOException {
-			rmiServerReference.set(null);
-			if (connector != null) {
-				try {
-					connector.stop();
-				} finally {
-					connector = null;
-				}
-			}
-			if (rmiRegistry != null) {
-				try {
-					UnicastRemoteObject.unexportObject(rmiRegistry, true);
-				} catch (NoSuchObjectException e) {
-					throw new IOException("Could not un-export our RMI registry", e);
-				} finally {
-					rmiRegistry = null;
-				}
-			}
-		}
-
-		/**
-		 * A registry that only exposes a single remote object.
-		 */
-		@SuppressWarnings("restriction")
-		private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
-			private final String lookupName;
-			private final AtomicReference<Remote> remoteServerStub;
-
-			JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
-				super(port);
-				this.lookupName = lookupName;
-				this.remoteServerStub = remoteServerStub;
-			}
-
-			@Override
-			public Remote lookup(String s) throws NotBoundException {
-				if (lookupName.equals(s)) {
-					final Remote remote = remoteServerStub.get();
-					if (remote != null) {
-						return remote;
-					}
-				}
-				throw new NotBoundException("Not bound.");
-			}
-
-			@Override
-			public void bind(String s, Remote remote) {
-				// this is called from RMIConnectorServer#start; don't throw a general AccessException
-			}
-
-			@Override
-			public void unbind(String s) {
-				// this is called from RMIConnectorServer#stop; don't throw a general AccessException
-			}
-
-			@Override
-			public void rebind(String s, Remote remote) {
-				// might as well not throw an exception here given that the others don't
-			}
-
-			@Override
-			public String[] list() {
-				return new String[]{lookupName};
-			}
-		}
-	}
 }
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index af02c6f..f589277 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -206,6 +206,9 @@ public class JMXReporterTest extends TestLogger {
 		assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
 		assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
 
+		// JMX Server URL should be identical since we made it static.
+		assertEquals(url1, url2);
+
 		rep1.notifyOfRemovedMetric(g1, "rep1", null);
 		rep1.notifyOfRemovedMetric(g2, "rep2", null);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 95c83c7..9b5b19b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JMXServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.management.JMXService;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.ReporterSetup;
@@ -256,6 +258,8 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
 				configuration.getString(JobManagerOptions.BIND_HOST),
 				configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
 
+			JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
+
 			// update the configuration used to create the high availability services
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
@@ -376,6 +380,12 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
 				terminationFutures.add(commonRpcService.stopService());
 			}
 
+			try {
+				JMXService.stopInstance();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+
 			if (exception != null) {
 				terminationFutures.add(FutureUtils.completedExceptionally(exception));
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXServer.java
new file mode 100644
index 0000000..f590954
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXServer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.NotBoundException;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * JMX Server implementation that JMX clients can connect to.
+ *
+ * <p>Heavily based on j256 simplejmx project
+ *
+ * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+ */
+class JMXServer {
+	private static final Logger LOG = LoggerFactory.getLogger(JMXServer.class);
+
+	private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
+
+	private Registry rmiRegistry;
+	private JMXConnectorServer connector;
+	private int port;
+
+	JMXServer() {
+	}
+
+	void start(int port) throws IOException {
+		if (rmiRegistry != null && connector != null) {
+			LOG.debug("JMXServer is already running.");
+			return;
+		}
+		internalStart(port);
+		this.port = port;
+	}
+
+	void stop() throws IOException {
+		rmiServerReference.set(null);
+		if (connector != null) {
+			try {
+				connector.stop();
+			} finally {
+				connector = null;
+			}
+		}
+		if (rmiRegistry != null) {
+			try {
+				UnicastRemoteObject.unexportObject(rmiRegistry, true);
+			} catch (NoSuchObjectException e) {
+				throw new IOException("Could not un-export our RMI registry", e);
+			} finally {
+				rmiRegistry = null;
+			}
+		}
+	}
+
+	int getPort() {
+		return port;
+	}
+
+	private void internalStart(int port) throws IOException {
+		rmiServerReference.set(null);
+
+		// this allows clients to lookup the JMX service
+		rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
+
+		String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
+		JMXServiceURL url;
+		try {
+			url = new JMXServiceURL(serviceUrl);
+		} catch (MalformedURLException e) {
+			throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
+		}
+
+		final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
+
+		connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
+		connector.start();
+
+		// we can't pass the created stub directly to the registry since this would form a cyclic dependency:
+		// - you can only start the connector after the registry was started
+		// - you can only create the stub after the connector was started
+		// - you can only start the registry after the stub was created
+		rmiServerReference.set(rmiServer.toStub());
+	}
+
+	/**
+	 * A registry that only exposes a single remote object.
+	 */
+	@SuppressWarnings("restriction")
+	private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+		private final String lookupName;
+		private final AtomicReference<Remote> remoteServerStub;
+
+		JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
+			super(port);
+			this.lookupName = lookupName;
+			this.remoteServerStub = remoteServerStub;
+		}
+
+		@Override
+		public Remote lookup(String s) throws NotBoundException {
+			if (lookupName.equals(s)) {
+				final Remote remote = remoteServerStub.get();
+				if (remote != null) {
+					return remote;
+				}
+			}
+			throw new NotBoundException("Not bound.");
+		}
+
+		@Override
+		public void bind(String s, Remote remote) {
+			// this is called from RMIConnectorServer#start; don't throw a general AccessException
+		}
+
+		@Override
+		public void unbind(String s) {
+			// this is called from RMIConnectorServer#stop; don't throw a general AccessException
+		}
+
+		@Override
+		public void rebind(String s, Remote remote) {
+			// might as well not throw an exception here given that the others don't
+		}
+
+		@Override
+		public String[] list() {
+			return new String[]{lookupName};
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
new file mode 100644
index 0000000..816e856
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+
+/**
+ * Provide a JVM-wide singleton JMX Service.
+ */
+public class JMXService {
+	private static final Logger LOG = LoggerFactory.getLogger(JMXService.class);
+	private static JMXServer jmxServer = null;
+
+	/**
+	 * Acquire the global singleton JMXServer instance.
+	 */
+	public static Optional<JMXServer> getInstance() {
+		return Optional.ofNullable(jmxServer);
+	}
+
+	/**
+	 * Start the JMV-wide singleton JMX server.
+	 *
+	 * <p>If JMXServer static instance is already started, it will not be
+	 * started again. Instead a warning will be logged indicating which port
+	 * the existing JMXServer static instance is exposing.
+	 *
+	 * @param portsConfig port configuration of the JMX server.
+	 */
+	public static synchronized void startInstance(String portsConfig) {
+		if (jmxServer == null) {
+			if (portsConfig != null) {
+				Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
+				if (ports.hasNext()) {
+					jmxServer = startJMXServerWithPortRanges(ports);
+				}
+				if (jmxServer == null) {
+					LOG.error("Could not start JMX server on any configured port(s) in: " + portsConfig);
+				}
+			}
+		} else {
+			LOG.warn("JVM-wide JMXServer already started at port: " + jmxServer.getPort());
+		}
+	}
+
+	/**
+	 * Stop the JMX server.
+	 */
+	public static synchronized void stopInstance() throws IOException {
+		if (jmxServer != null) {
+			jmxServer.stop();
+			jmxServer = null;
+		}
+	}
+
+	public static Optional<Integer> getPort() {
+		return Optional.ofNullable(jmxServer).map(JMXServer::getPort);
+	}
+
+	private static JMXServer startJMXServerWithPortRanges(Iterator<Integer> ports) {
+		JMXServer successfullyStartedServer = null;
+		while (ports.hasNext() && successfullyStartedServer == null) {
+			JMXServer server = new JMXServer();
+			int port = ports.next();
+			try {
+				server.start(port);
+				LOG.info("Started JMX server on port " + port + ".");
+				successfullyStartedServer = server;
+			} catch (IOException ioe) { //assume port conflict
+				LOG.debug("Could not start JMX server on port " + port + ".", ioe);
+				try {
+					server.stop();
+				} catch (Exception e) {
+					LOG.debug("Could not stop JMX server.", e);
+				}
+			}
+		}
+		return successfullyStartedServer;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index e44e5f0..52c17f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JMXServerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.management.JMXService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -137,6 +139,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			executor,
 			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
+		JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
+
 		rpcService = createRpcService(configuration, highAvailabilityServices);
 
 		this.resourceId = new ResourceID(getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort()));
@@ -217,6 +221,12 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			Exception exception = null;
 
 			try {
+				JMXService.stopInstance();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			try {
 				blobCacheService.close();
 			} catch (Exception e) {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServerTest.java
new file mode 100644
index 0000000..2f2528c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link JMXServer} functionality.
+ */
+public class JMXServerTest {
+
+	@Before
+	public void setUp() throws Exception {
+		JMXService.startInstance("23456-23466");
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		JMXService.stopInstance();
+	}
+
+	/**
+	 * Verifies initialize, registered mBean and retrieval via attribute.
+	 */
+	@Test
+	public void testJMXServiceRegisterMBean() throws Exception {
+		TestObject testObject = new TestObject();
+		ObjectName testObjectName = new ObjectName("org.apache.flink.management", "key", "value");
+		MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+		try {
+			Optional<JMXServer> server = JMXService.getInstance();
+			assertTrue(server.isPresent());
+			mBeanServer.registerMBean(testObject, testObjectName);
+
+			JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://localhost:" + server.get().getPort() + "/jndi/rmi://localhost:" + server.get().getPort() + "/jmxrmi");
+			JMXConnector jmxConn = JMXConnectorFactory.connect(url);
+			MBeanServerConnection mbeanConnConn = jmxConn.getMBeanServerConnection();
+
+			assertEquals(1, mbeanConnConn.getAttribute(testObjectName, "Foo"));
+			mBeanServer.unregisterMBean(testObjectName);
+			try {
+				mbeanConnConn.getAttribute(testObjectName, "Foo");
+			} catch (Exception e) {
+				// expected for unregistered objects.
+				assertTrue(e instanceof InstanceNotFoundException);
+			}
+		} finally {
+			JMXService.stopInstance();
+		}
+	}
+
+	/**
+	 * Test MBean interface.
+	 */
+	public interface TestObjectMBean {
+		int getFoo();
+	}
+
+	/**
+	 * Test MBean Object.
+	 */
+	public static class TestObject implements TestObjectMBean {
+		private int foo = 1;
+
+		@Override
+		public int getFoo() {
+			return foo;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServiceTest.java
new file mode 100644
index 0000000..780c5cd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/management/JMXServiceTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.management;
+
+import org.junit.Test;
+
+import java.net.ServerSocket;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the singleton usage via {@link JMXService}.
+ */
+public class JMXServiceTest {
+
+	/**
+	 * Verifies initialize with port range.
+	 */
+	@Test
+	public void testJMXServiceInit() throws Exception {
+		try {
+			JMXService.startInstance("23456-23466");
+			assertTrue(JMXService.getPort().isPresent());
+		} finally {
+			JMXService.stopInstance();
+		}
+	}
+
+	/**
+	 * Verifies initialize failure with occupied port.
+	 */
+	@Test
+	public void testJMXServiceInitWithOccupiedPort() throws Exception {
+		try (ServerSocket socket = new ServerSocket(0)) {
+			JMXService.startInstance(String.valueOf(socket.getLocalPort()));
+			assertFalse(JMXService.getInstance().isPresent());
+		}
+	}
+}