You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2020/05/11 08:32:29 UTC
[flink] branch master updated: [FLINK-17454][python] Specify a port
number for gateway callback server from python gateway.
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5f744d3 [FLINK-17454][python] Specify a port number for gateway callback server from python gateway.
5f744d3 is described below
commit 5f744d3f81bcfb8f77164a5ec9caa4594851d4bf
Author: acqua.csq <ac...@alibaba-inc.com>
AuthorDate: Fri May 8 23:29:12 2020 +0800
[FLINK-17454][python] Specify a port number for gateway callback server from python gateway.
This closes #12061
---
flink-python/pyflink/java_gateway.py | 11 +++--
.../apache/flink/client/python/PythonEnvUtils.java | 55 +++++++++++++++++-----
2 files changed, 49 insertions(+), 17 deletions(-)
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index d8e061b..33ab2ae 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -49,15 +49,19 @@ def get_gateway():
# if Java Gateway is already running
if 'PYFLINK_GATEWAY_PORT' in os.environ:
gateway_port = int(os.environ['PYFLINK_GATEWAY_PORT'])
- callback_port = int(os.environ['PYFLINK_CALLBACK_PORT'])
gateway_param = GatewayParameters(port=gateway_port, auto_convert=True)
_gateway = JavaGateway(
gateway_parameters=gateway_param,
callback_server_parameters=CallbackServerParameters(
- port=callback_port, daemonize=True, daemonize_connections=True))
+ port=0, daemonize=True, daemonize_connections=True))
else:
_gateway = launch_gateway()
+ callback_server = _gateway.get_callback_server()
+ callback_server_listening_address = callback_server.get_listening_address()
+ callback_server_listening_port = callback_server.get_listening_port()
+ _gateway.jvm.org.apache.flink.client.python.PythonEnvUtils.resetCallbackClient(
+ callback_server_listening_address, callback_server_listening_port)
# import the flink view
import_flink_view(_gateway)
install_exception_handler()
@@ -102,7 +106,6 @@ def launch_gateway():
with open(conn_info_file, "rb") as info:
gateway_port = struct.unpack("!I", info.read(4))[0]
- callback_port = struct.unpack("!I", info.read(4))[0]
finally:
shutil.rmtree(conn_info_dir)
@@ -110,7 +113,7 @@ def launch_gateway():
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True),
callback_server_parameters=CallbackServerParameters(
- port=callback_port, daemonize=True, daemonize_connections=True))
+ port=0, daemonize=True, daemonize_connections=True))
return gateway
diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index cf15b7b..76370dc 100644
--- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -35,7 +35,10 @@ import py4j.GatewayServer;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
@@ -277,16 +280,7 @@ final class PythonEnvUtils {
.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
.javaPort(0)
.build();
- CallbackClient callbackClient = (CallbackClient) server.getCallbackClient();
- // The Java API of py4j does not provide approach to set "daemonize_connections" parameter.
- // Use reflect to daemonize the connection thread.
- Field executor = CallbackClient.class.getDeclaredField("executor");
- executor.setAccessible(true);
- ((ScheduledExecutorService) executor.get(callbackClient)).shutdown();
- executor.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new));
- Method setupCleaner = CallbackClient.class.getDeclaredMethod("setupCleaner");
- setupCleaner.setAccessible(true);
- setupCleaner.invoke(callbackClient);
+ resetCallbackClientExecutorService(server);
gatewayServerFuture.complete(server);
server.start(true);
} catch (Throwable e) {
@@ -301,6 +295,43 @@ final class PythonEnvUtils {
}
/**
+ * Reset a daemon thread to the callback client thread pool so that the callback server can be terminated when gate
+ * way server is shutting down. We need to shut down the none-daemon thread firstly, then set a new thread created
+ * in a daemon thread to the ExecutorService.
+ *
+ * @param gatewayServer the gateway which creates the callback server.
+ * */
+ private static void resetCallbackClientExecutorService(GatewayServer gatewayServer) throws NoSuchFieldException,
+ IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+ CallbackClient callbackClient = (CallbackClient) gatewayServer.getCallbackClient();
+ // The Java API of py4j does not provide approach to set "daemonize_connections" parameter.
+ // Use reflect to daemonize the connection thread.
+ Field executor = CallbackClient.class.getDeclaredField("executor");
+ executor.setAccessible(true);
+ ((ScheduledExecutorService) executor.get(callbackClient)).shutdown();
+ executor.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new));
+ Method setupCleaner = CallbackClient.class.getDeclaredMethod("setupCleaner");
+ setupCleaner.setAccessible(true);
+ setupCleaner.invoke(callbackClient);
+ }
+
+ /**
+ * Reset the callback client of gatewayServer with the given callbackListeningAddress and callbackListeningPort
+ * after the callback server started.
+ *
+ * @param callbackServerListeningAddress the listening address of the callback server.
+ * @param callbackServerListeningPort the listening port of the callback server.
+ * */
+ public static void resetCallbackClient(String callbackServerListeningAddress, int callbackServerListeningPort) throws
+ UnknownHostException, InvocationTargetException, NoSuchMethodException, IllegalAccessException,
+ NoSuchFieldException {
+
+ gatewayServer = getGatewayServer();
+ gatewayServer.resetCallbackClient(InetAddress.getByName(callbackServerListeningAddress), callbackServerListeningPort);
+ resetCallbackClientExecutorService(gatewayServer);
+ }
+
+ /**
* Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is
* the entry point of Java to Python RPC. Since the Py4j Python client will only be launched
* only once, the GatewayServer object needs to be reused.
@@ -313,7 +344,7 @@ final class PythonEnvUtils {
static void setGatewayServer(GatewayServer gatewayServer) {
Preconditions.checkArgument(gatewayServer == null || PythonEnvUtils.gatewayServer == null);
- PythonEnvUtils.gatewayServer = null;
+ PythonEnvUtils.gatewayServer = gatewayServer;
}
static Process launchPy4jPythonClient(
@@ -326,8 +357,6 @@ final class PythonEnvUtils {
config, entryPointScript, tmpDir);
// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process.
pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
- // set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process.
- pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort()));
// start the python process.
return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
}