You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2020/05/11 08:32:29 UTC

[flink] branch master updated: [FLINK-17454][python] Specify a port number for gateway callback server from python gateway.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f744d3  [FLINK-17454][python] Specify a port number for gateway callback server from python gateway.
5f744d3 is described below

commit 5f744d3f81bcfb8f77164a5ec9caa4594851d4bf
Author: acqua.csq <ac...@alibaba-inc.com>
AuthorDate: Fri May 8 23:29:12 2020 +0800

    [FLINK-17454][python] Specify a port number for gateway callback server from python gateway.
    
    This closes #12061
---
 flink-python/pyflink/java_gateway.py               | 11 +++--
 .../apache/flink/client/python/PythonEnvUtils.java | 55 +++++++++++++++++-----
 2 files changed, 49 insertions(+), 17 deletions(-)

diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index d8e061b..33ab2ae 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -49,15 +49,19 @@ def get_gateway():
             # if Java Gateway is already running
             if 'PYFLINK_GATEWAY_PORT' in os.environ:
                 gateway_port = int(os.environ['PYFLINK_GATEWAY_PORT'])
-                callback_port = int(os.environ['PYFLINK_CALLBACK_PORT'])
                 gateway_param = GatewayParameters(port=gateway_port, auto_convert=True)
                 _gateway = JavaGateway(
                     gateway_parameters=gateway_param,
                     callback_server_parameters=CallbackServerParameters(
-                        port=callback_port, daemonize=True, daemonize_connections=True))
+                        port=0, daemonize=True, daemonize_connections=True))
             else:
                 _gateway = launch_gateway()
 
+            callback_server = _gateway.get_callback_server()
+            callback_server_listening_address = callback_server.get_listening_address()
+            callback_server_listening_port = callback_server.get_listening_port()
+            _gateway.jvm.org.apache.flink.client.python.PythonEnvUtils.resetCallbackClient(
+                callback_server_listening_address, callback_server_listening_port)
             # import the flink view
             import_flink_view(_gateway)
             install_exception_handler()
@@ -102,7 +106,6 @@ def launch_gateway():
 
         with open(conn_info_file, "rb") as info:
             gateway_port = struct.unpack("!I", info.read(4))[0]
-            callback_port = struct.unpack("!I", info.read(4))[0]
     finally:
         shutil.rmtree(conn_info_dir)
 
@@ -110,7 +113,7 @@ def launch_gateway():
     gateway = JavaGateway(
         gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True),
         callback_server_parameters=CallbackServerParameters(
-            port=callback_port, daemonize=True, daemonize_connections=True))
+            port=0, daemonize=True, daemonize_connections=True))
 
     return gateway
 
diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index cf15b7b..76370dc 100644
--- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -35,7 +35,10 @@ import py4j.GatewayServer;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.file.FileSystems;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
@@ -277,16 +280,7 @@ final class PythonEnvUtils {
 					.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
 					.javaPort(0)
 					.build();
-				CallbackClient callbackClient = (CallbackClient) server.getCallbackClient();
-				// The Java API of py4j does not provide approach to set "daemonize_connections" parameter.
-				// Use reflect to daemonize the connection thread.
-				Field executor = CallbackClient.class.getDeclaredField("executor");
-				executor.setAccessible(true);
-				((ScheduledExecutorService) executor.get(callbackClient)).shutdown();
-				executor.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new));
-				Method setupCleaner = CallbackClient.class.getDeclaredMethod("setupCleaner");
-				setupCleaner.setAccessible(true);
-				setupCleaner.invoke(callbackClient);
+				resetCallbackClientExecutorService(server);
 				gatewayServerFuture.complete(server);
 				server.start(true);
 			} catch (Throwable e) {
@@ -301,6 +295,43 @@ final class PythonEnvUtils {
 	}
 
 	/**
+	 * Reset a daemon thread to the callback client thread pool so that the callback server can be terminated when gate
+	 * way server is shutting down. We need to shut down the none-daemon thread firstly, then set a new thread created
+	 * in a daemon thread to the ExecutorService.
+	 *
+	 * @param gatewayServer the gateway which creates the callback server.
+	 * */
+	private static void resetCallbackClientExecutorService(GatewayServer gatewayServer) throws NoSuchFieldException,
+		IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+		CallbackClient callbackClient = (CallbackClient) gatewayServer.getCallbackClient();
+		// The Java API of py4j does not provide approach to set "daemonize_connections" parameter.
+		// Use reflect to daemonize the connection thread.
+		Field executor = CallbackClient.class.getDeclaredField("executor");
+		executor.setAccessible(true);
+		((ScheduledExecutorService) executor.get(callbackClient)).shutdown();
+		executor.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new));
+		Method setupCleaner = CallbackClient.class.getDeclaredMethod("setupCleaner");
+		setupCleaner.setAccessible(true);
+		setupCleaner.invoke(callbackClient);
+	}
+
+	/**
+	 * Reset the callback client of gatewayServer with the given callbackListeningAddress and callbackListeningPort
+	 * after the callback server started.
+	 *
+	 * @param callbackServerListeningAddress the listening address of the callback server.
+	 * @param callbackServerListeningPort the listening port of the callback server.
+	 * */
+	public static void resetCallbackClient(String callbackServerListeningAddress, int callbackServerListeningPort) throws
+		UnknownHostException, InvocationTargetException, NoSuchMethodException, IllegalAccessException,
+		NoSuchFieldException {
+
+		gatewayServer = getGatewayServer();
+		gatewayServer.resetCallbackClient(InetAddress.getByName(callbackServerListeningAddress), callbackServerListeningPort);
+		resetCallbackClientExecutorService(gatewayServer);
+	}
+
+	/**
 	 * Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is
 	 * the entry point of Java to Python RPC. Since the Py4j Python client will only be launched
 	 * only once, the GatewayServer object needs to be reused.
@@ -313,7 +344,7 @@ final class PythonEnvUtils {
 
 	static void setGatewayServer(GatewayServer gatewayServer) {
 		Preconditions.checkArgument(gatewayServer == null || PythonEnvUtils.gatewayServer == null);
-		PythonEnvUtils.gatewayServer = null;
+		PythonEnvUtils.gatewayServer = gatewayServer;
 	}
 
 	static Process launchPy4jPythonClient(
@@ -326,8 +357,6 @@ final class PythonEnvUtils {
 			config, entryPointScript, tmpDir);
 		// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process.
 		pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
-		// set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process.
-		pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort()));
 		// start the python process.
 		return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
 	}