You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/13 08:03:22 UTC

[flink] branch release-1.8 updated: [FLINK-16697][metrics][jmx] Disable rebinding

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 0e8e806  [FLINK-16697][metrics][jmx] Disable rebinding
0e8e806 is described below

commit 0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Fri Mar 20 10:13:12 2020 +0000

    [FLINK-16697][metrics][jmx] Disable rebinding
---
 .../org/apache/flink/metrics/jmx/JMXReporter.java  | 91 ++++++++++++++++------
 pom.xml                                            |  1 +
 2 files changed, 70 insertions(+), 22 deletions(-)

diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 461f1dc..b87e146 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -41,20 +41,24 @@ import javax.management.MalformedObjectNameException;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXConnectorServerFactory;
 import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.MalformedURLException;
 import java.rmi.NoSuchObjectException;
-import java.rmi.registry.LocateRegistry;
+import java.rmi.NotBoundException;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
 import java.rmi.registry.Registry;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -469,7 +473,7 @@ public class JMXReporter implements MetricReporter {
 	/**
 	 * JMX Server implementation that JMX clients can connect to.
 	 *
-	 * <p>Heavily based on j256 simplejmx project
+	 * <p>Originally based on j256 simplejmx project
 	 *
 	 * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
 	 */
@@ -477,34 +481,23 @@ public class JMXReporter implements MetricReporter {
 		private Registry rmiRegistry;
 		private JMXConnectorServer connector;
 		private int port;
+		private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
 
 		public void start(int port) throws IOException {
 			if (rmiRegistry != null && connector != null) {
 				LOG.debug("JMXServer is already running.");
 				return;
 			}
-			startRmiRegistry(port);
-			startJmxService(port);
+			internalStart(port);
 			this.port = port;
 		}
 
-		/**
-		 * Starts an RMI Registry that allows clients to lookup the JMX IP/port.
-		 *
-		 * @param port rmi port to use
-		 * @throws IOException
-		 */
-		private void startRmiRegistry(int port) throws IOException {
-			rmiRegistry = LocateRegistry.createRegistry(port);
-		}
+		private void internalStart(int port) throws IOException {
+			rmiServerReference.set(null);
+
+			// this allows clients to lookup the JMX service
+			rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
 
-		/**
-		 * Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations.
-		 *
-		 * @param port jmx port to use
-		 * @throws IOException
-		 */
-		private void startJmxService(int port) throws IOException {
 			String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
 			JMXServiceURL url;
 			try {
@@ -513,12 +506,20 @@ public class JMXReporter implements MetricReporter {
 				throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
 			}
 
-			connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
+			final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
 
+			connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
 			connector.start();
+
+			// we can't pass the created stub directly to the registry since this would form a cyclic dependency:
+			// - you can only start the connector after the registry was started
+			// - you can only create the stub after the connector was started
+			// - you can only start the registry after the stub was created
+			rmiServerReference.set(rmiServer.toStub());
 		}
 
 		public void stop() throws IOException {
+			rmiServerReference.set(null);
 			if (connector != null) {
 				try {
 					connector.stop();
@@ -536,5 +537,51 @@ public class JMXReporter implements MetricReporter {
 				}
 			}
 		}
+
+		/**
+		 * A registry that only exposes a single remote object.
+		 */
+		@SuppressWarnings("restriction")
+		private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+			private final String lookupName;
+			private final AtomicReference<Remote> remoteServerStub;
+
+			JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
+				super(port);
+				this.lookupName = lookupName;
+				this.remoteServerStub = remoteServerStub;
+			}
+
+			@Override
+			public Remote lookup(String s) throws NotBoundException {
+				if (lookupName.equals(s)) {
+					final Remote remote = remoteServerStub.get();
+					if (remote != null) {
+						return remote;
+					}
+				}
+				throw new NotBoundException("Not bound.");
+			}
+
+			@Override
+			public void bind(String s, Remote remote) {
+				// this is called from RMIConnectorServer#start; don't throw a general AccessException
+			}
+
+			@Override
+			public void unbind(String s) {
+				// this is called from RMIConnectorServer#stop; don't throw a general AccessException
+			}
+
+			@Override
+			public void rebind(String s, Remote remote) {
+				// might as well not throw an exception here given that the others don't
+			}
+
+			@Override
+			public String[] list() {
+				return new String[]{lookupName};
+			}
+		}
 	}
 }
diff --git a/pom.xml b/pom.xml
index ba0ad45..e68fd56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -772,6 +772,7 @@ under the License.
 							<compilerArgs combine.children="append">
 								<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
 								<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
+								<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
 							</compilerArgs>
 						</configuration>
 					</plugin>