You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/13 08:00:45 UTC
[flink] branch release-1.1 updated: [FLINK-16697][metrics][jmx]
Disable rebinding
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.1 by this push:
new a61b5d2 [FLINK-16697][metrics][jmx] Disable rebinding
a61b5d2 is described below
commit a61b5d2b362d11e7b9deeb2334d275325574bd7b
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Fri Mar 20 10:13:12 2020 +0000
[FLINK-16697][metrics][jmx] Disable rebinding
---
.../org/apache/flink/metrics/jmx/JMXReporter.java | 92 ++++++++++++++++------
1 file changed, 70 insertions(+), 22 deletions(-)
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 1a283d9..b9e1b22 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -37,18 +37,23 @@ import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.rmi.NoSuchObjectException;
-import java.rmi.registry.LocateRegistry;
+import java.rmi.NotBoundException;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -420,7 +425,7 @@ public class JMXReporter implements MetricReporter {
/**
* JMX Server implementation that JMX clients can connect to.
*
- * Heavily based on j256 simplejmx project
+ * Originally based on j256 simplejmx project
*
* https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
*/
@@ -428,34 +433,23 @@ public class JMXReporter implements MetricReporter {
private Registry rmiRegistry;
private JMXConnectorServer connector;
private int port;
+ private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
public void start(int port) throws IOException {
if (rmiRegistry != null && connector != null) {
LOG.debug("JMXServer is already running.");
return;
}
- startRmiRegistry(port);
- startJmxService(port);
+ internalStart(port);
this.port = port;
}
- /**
- * Starts an RMI Registry that allows clients to lookup the JMX IP/port.
- *
- * @param port rmi port to use
- * @throws IOException
- */
- private void startRmiRegistry(int port) throws IOException {
- rmiRegistry = LocateRegistry.createRegistry(port);
- }
+ private void internalStart(int port) throws IOException {
+ rmiServerReference.set(null);
+
+ // this allows clients to lookup the JMX service
+ rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
- /**
- * Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations.
- *
- * @param port jmx port to use
- * @throws IOException
- */
- private void startJmxService(int port) throws IOException {
String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
JMXServiceURL url;
try {
@@ -464,12 +458,20 @@ public class JMXReporter implements MetricReporter {
throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
}
- connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
+ final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
+ connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
connector.start();
+
+ // we can't pass the created stub directly to the registry since this would form a cyclic dependency:
+ // - you can only start the connector after the registry was started
+ // - you can only create the stub after the connector was started
+ // - you can only start the registry after the stub was created
+ rmiServerReference.set(rmiServer.toStub());
}
public void stop() throws IOException {
+ rmiServerReference.set(null);
if (connector != null) {
try {
connector.stop();
@@ -487,5 +489,51 @@ public class JMXReporter implements MetricReporter {
}
}
}
+
+ /**
+ * A registry that only exposes a single remote object.
+ */
+ @SuppressWarnings("restriction")
+ private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+ private final String lookupName;
+ private final AtomicReference<Remote> remoteServerStub;
+
+ JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
+ super(port);
+ this.lookupName = lookupName;
+ this.remoteServerStub = remoteServerStub;
+ }
+
+ @Override
+ public Remote lookup(String s) throws NotBoundException {
+ if (lookupName.equals(s)) {
+ final Remote remote = remoteServerStub.get();
+ if (remote != null) {
+ return remote;
+ }
+ }
+ throw new NotBoundException("Not bound.");
+ }
+
+ @Override
+ public void bind(String s, Remote remote) {
+ // this is called from RMIConnectorServer#start; don't throw a general AccessException
+ }
+
+ @Override
+ public void unbind(String s) {
+ // this is called from RMIConnectorServer#stop; don't throw a general AccessException
+ }
+
+ @Override
+ public void rebind(String s, Remote remote) {
+ // might as well not throw an exception here given that the others don't
+ }
+
+ @Override
+ public String[] list() {
+ return new String[]{lookupName};
+ }
+ }
}
}