You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/13 08:03:22 UTC
[flink] branch release-1.8 updated: [FLINK-16697][metrics][jmx]
Disable rebinding
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 0e8e806 [FLINK-16697][metrics][jmx] Disable rebinding
0e8e806 is described below
commit 0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Fri Mar 20 10:13:12 2020 +0000
[FLINK-16697][metrics][jmx] Disable rebinding
---
.../org/apache/flink/metrics/jmx/JMXReporter.java | 91 ++++++++++++++++------
pom.xml | 1 +
2 files changed, 70 insertions(+), 22 deletions(-)
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 461f1dc..b87e146 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -41,20 +41,24 @@ import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.rmi.NoSuchObjectException;
-import java.rmi.registry.LocateRegistry;
+import java.rmi.NotBoundException;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
@@ -469,7 +473,7 @@ public class JMXReporter implements MetricReporter {
/**
* JMX Server implementation that JMX clients can connect to.
*
- * <p>Heavily based on j256 simplejmx project
+ * <p>Originally based on j256 simplejmx project
*
* <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
*/
@@ -477,34 +481,23 @@ public class JMXReporter implements MetricReporter {
private Registry rmiRegistry;
private JMXConnectorServer connector;
private int port;
+ private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
public void start(int port) throws IOException {
if (rmiRegistry != null && connector != null) {
LOG.debug("JMXServer is already running.");
return;
}
- startRmiRegistry(port);
- startJmxService(port);
+ internalStart(port);
this.port = port;
}
- /**
- * Starts an RMI Registry that allows clients to lookup the JMX IP/port.
- *
- * @param port rmi port to use
- * @throws IOException
- */
- private void startRmiRegistry(int port) throws IOException {
- rmiRegistry = LocateRegistry.createRegistry(port);
- }
+ private void internalStart(int port) throws IOException {
+ rmiServerReference.set(null);
+
+ // this allows clients to lookup the JMX service
+ rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
- /**
- * Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations.
- *
- * @param port jmx port to use
- * @throws IOException
- */
- private void startJmxService(int port) throws IOException {
String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
JMXServiceURL url;
try {
@@ -513,12 +506,20 @@ public class JMXReporter implements MetricReporter {
throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
}
- connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
+ final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
+ connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
connector.start();
+
+ // we can't pass the created stub directly to the registry since this would form a cyclic dependency:
+ // - you can only start the connector after the registry was started
+ // - you can only create the stub after the connector was started
+ // - you can only start the registry after the stub was created
+ rmiServerReference.set(rmiServer.toStub());
}
public void stop() throws IOException {
+ rmiServerReference.set(null);
if (connector != null) {
try {
connector.stop();
@@ -536,5 +537,51 @@ public class JMXReporter implements MetricReporter {
}
}
}
+
+ /**
+ * A registry that only exposes a single remote object.
+ */
+ @SuppressWarnings("restriction")
+ private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+ private final String lookupName;
+ private final AtomicReference<Remote> remoteServerStub;
+
+ JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
+ super(port);
+ this.lookupName = lookupName;
+ this.remoteServerStub = remoteServerStub;
+ }
+
+ @Override
+ public Remote lookup(String s) throws NotBoundException {
+ if (lookupName.equals(s)) {
+ final Remote remote = remoteServerStub.get();
+ if (remote != null) {
+ return remote;
+ }
+ }
+ throw new NotBoundException("Not bound.");
+ }
+
+ @Override
+ public void bind(String s, Remote remote) {
+ // this is called from RMIConnectorServer#start; don't throw a general AccessException
+ }
+
+ @Override
+ public void unbind(String s) {
+ // this is called from RMIConnectorServer#stop; don't throw a general AccessException
+ }
+
+ @Override
+ public void rebind(String s, Remote remote) {
+ // might as well not throw an exception here given that the others don't
+ }
+
+ @Override
+ public String[] list() {
+ return new String[]{lookupName};
+ }
+ }
}
}
diff --git a/pom.xml b/pom.xml
index ba0ad45..e68fd56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -772,6 +772,7 @@ under the License.
<compilerArgs combine.children="append">
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
+ <arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>