You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/14 03:00:16 UTC

[GitHub] [flink] dianfu commented on a change in pull request #12092: [FLINK-17597][python] Refactor the ProcessPythonEnvironmentManager to move the pyflink udf runner script to the pyflink python package.

dianfu commented on a change in pull request #12092:
URL: https://github.com/apache/flink/pull/12092#discussion_r424833367



##########
File path: flink-python/pyflink/table/tests/test_dependency.py
##########
@@ -130,9 +129,22 @@ def test_set_requirements_with_cached_directory(self):
         os.mkdir(requirements_dir_path)
         package_file_name = "python-package1-0.0.0.tar.gz"
         with open(os.path.join(requirements_dir_path, package_file_name), 'wb') as f:
-            from pyflink.fn_execution.tests.process_mode_test_data import file_data
             import base64
-            f.write(base64.b64decode(json.loads(file_data[package_file_name])["data"]))
+            # This base64 data is encoded from a python package file which includes a
+            # "python_package1" module. The module contains a "plus(a, b)" function.

Review comment:
       could you add some description on how to recompute the base64?

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironmentManager.java
##########
@@ -221,7 +235,7 @@ public String createRetrievalToken() throws IOException {
 		// disable the launching of gateway server to prevent from this dead loop:
 		// launch UDF worker -> import udf -> import job code
 		//        ^                                    | (If the job code is not enclosed in a
-		//        									   |  if name == 'main' statement)
+		//        |								       |  if name == 'main' statement)

Review comment:
       revert this change?

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+	private static final int MAX_RETRY_TIMES = 3;
+
+	private static final String PYFLINK_UDF_RUNNER_SH = "pyflink-udf-runner.sh";
+	private static final String PYFLINK_UDF_RUNNER_BAT = "pyflink-udf-runner.bat";
+
+	private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+		"import sys;" +
+		"from distutils.dist import Distribution;" +
+		"install_obj = Distribution().get_command_obj('install', create=True);" +
+		"install_obj.prefix = sys.argv[1];" +
+		"install_obj.finalize_options();" +
+		"installed_dir = [install_obj.install_purelib];" +
+		"install_obj.install_purelib != install_obj.install_platlib and " +
+			"installed_dir.append(install_obj.install_platlib);" +
+		"print(installed_dir[0]);" +
+		"len(installed_dir) > 1 and " +
+			"print(installed_dir[1])";
+
+	private static final String CHECK_PIP_VERSION_SCRIPT =
+		"import sys;" +
+		"from pkg_resources import get_distribution, parse_version;" +
+		"pip_version = get_distribution('pip').version;" +
+		"print(parse_version(pip_version) >= parse_version(sys.argv[1]))";
+
+	private static final String GET_RUNNER_DIR_SCRIPT =
+		"import pyflink;" +
+		"import os;" +
+		"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))";
+
+	/**
+	 * Installs the 3rd party libraries listed in the user-provided requirements file. An optional
+	 * requirements cached directory can be provided to support offline installation. In order not
+	 * to populate the public environment, the libraries will be installed to the specified
+	 * directory, and added to the PYTHONPATH of the UDF workers.
+	 *
+	 * @param requirementsFilePath The path of the requirements file.
+	 * @param requirementsCacheDir The path of the requirements cached directory.
+	 * @param requirementsInstallDir The target directory of the installation.
+	 * @param pythonExecutable The python interpreter used to launch the pip program.
+	 * @param environmentVariables The environment variables used to launch the pip program.
+	 */
+	public static void pipInstallRequirements(
+			String requirementsFilePath,
+			@Nullable String requirementsCacheDir,
+			String requirementsInstallDir,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String sitePackagesPath = getSitePackagesPath(requirementsInstallDir, pythonExecutable, environmentVariables);
+		String path = String.join(File.pathSeparator, requirementsInstallDir, "bin");
+		appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, environmentVariables);
+		appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+		List<String> commands = new ArrayList<>(Arrays.asList(
+			pythonExecutable, "-m", "pip", "install", "--ignore-installed", "-r", requirementsFilePath));
+		if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, environmentVariables)) {
+			commands.addAll(Arrays.asList("--prefix", requirementsInstallDir));
+		} else {
+			commands.addAll(Arrays.asList("--install-option", "--prefix=" + requirementsInstallDir));
+		}
+		if (requirementsCacheDir != null) {
+			commands.addAll(Arrays.asList("--find-links", requirementsCacheDir));
+		}
+
+		int retries = 0;
+		while (true) {
+			try {
+				execute(commands.toArray(new String[0]), environmentVariables, true);
+				break;
+			} catch (Throwable t) {
+				retries++;
+				if (retries < MAX_RETRY_TIMES) {
+					LOG.warn(String.format("Pip install failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+				} else {
+					LOG.error(String.format("Pip install failed, already retried %d time...", retries));
+					throw t;
+				}
+			}
+		}
+	}
+
+	public static String getRunnerScriptPath(
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", GET_RUNNER_DIR_SCRIPT};
+		String out = execute(commands, environmentVariables, false);
+		String runnerScriptPath;
+		if (OperatingSystem.isWindows()) {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_BAT);
+		} else {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_SH);
+		}
+		if (!new File(runnerScriptPath).exists()) {
+			throw new FileNotFoundException(String.format(
+				"The runner script '%s' does not exist! " +
+				"Please reinstall the apache-flink Python package.", runnerScriptPath));
+		}
+		return runnerScriptPath;
+	}
+
+	private static String getSitePackagesPath(
+			String prefix,
+			String pythonExecutable,

Review comment:
       it will not throw `InterruptedException` any more. Should also check the other places.

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+	private static final int MAX_RETRY_TIMES = 3;
+
+	private static final String PYFLINK_UDF_RUNNER_SH = "pyflink-udf-runner.sh";
+	private static final String PYFLINK_UDF_RUNNER_BAT = "pyflink-udf-runner.bat";
+
+	private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+		"import sys;" +
+		"from distutils.dist import Distribution;" +
+		"install_obj = Distribution().get_command_obj('install', create=True);" +
+		"install_obj.prefix = sys.argv[1];" +
+		"install_obj.finalize_options();" +
+		"installed_dir = [install_obj.install_purelib];" +
+		"install_obj.install_purelib != install_obj.install_platlib and " +
+			"installed_dir.append(install_obj.install_platlib);" +
+		"print(installed_dir[0]);" +
+		"len(installed_dir) > 1 and " +
+			"print(installed_dir[1])";
+
+	private static final String CHECK_PIP_VERSION_SCRIPT =
+		"import sys;" +
+		"from pkg_resources import get_distribution, parse_version;" +
+		"pip_version = get_distribution('pip').version;" +
+		"print(parse_version(pip_version) >= parse_version(sys.argv[1]))";
+
+	private static final String GET_RUNNER_DIR_SCRIPT =
+		"import pyflink;" +
+		"import os;" +
+		"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))";
+
+	/**
+	 * Installs the 3rd party libraries listed in the user-provided requirements file. An optional
+	 * requirements cached directory can be provided to support offline installation. In order not
+	 * to populate the public environment, the libraries will be installed to the specified
+	 * directory, and added to the PYTHONPATH of the UDF workers.
+	 *
+	 * @param requirementsFilePath The path of the requirements file.
+	 * @param requirementsCacheDir The path of the requirements cached directory.
+	 * @param requirementsInstallDir The target directory of the installation.
+	 * @param pythonExecutable The python interpreter used to launch the pip program.
+	 * @param environmentVariables The environment variables used to launch the pip program.
+	 */
+	public static void pipInstallRequirements(
+			String requirementsFilePath,
+			@Nullable String requirementsCacheDir,
+			String requirementsInstallDir,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String sitePackagesPath = getSitePackagesPath(requirementsInstallDir, pythonExecutable, environmentVariables);
+		String path = String.join(File.pathSeparator, requirementsInstallDir, "bin");
+		appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, environmentVariables);
+		appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+		List<String> commands = new ArrayList<>(Arrays.asList(
+			pythonExecutable, "-m", "pip", "install", "--ignore-installed", "-r", requirementsFilePath));
+		if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, environmentVariables)) {
+			commands.addAll(Arrays.asList("--prefix", requirementsInstallDir));
+		} else {
+			commands.addAll(Arrays.asList("--install-option", "--prefix=" + requirementsInstallDir));
+		}
+		if (requirementsCacheDir != null) {
+			commands.addAll(Arrays.asList("--find-links", requirementsCacheDir));
+		}
+
+		int retries = 0;
+		while (true) {
+			try {
+				execute(commands.toArray(new String[0]), environmentVariables, true);
+				break;
+			} catch (Throwable t) {
+				retries++;
+				if (retries < MAX_RETRY_TIMES) {
+					LOG.warn(String.format("Pip install failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+				} else {
+					LOG.error(String.format("Pip install failed, already retried %d time...", retries));
+					throw t;
+				}
+			}
+		}
+	}
+
+	public static String getRunnerScriptPath(
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", GET_RUNNER_DIR_SCRIPT};
+		String out = execute(commands, environmentVariables, false);
+		String runnerScriptPath;
+		if (OperatingSystem.isWindows()) {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_BAT);
+		} else {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_SH);
+		}
+		if (!new File(runnerScriptPath).exists()) {
+			throw new FileNotFoundException(String.format(
+				"The runner script '%s' does not exist! " +
+				"Please reinstall the apache-flink Python package.", runnerScriptPath));
+		}
+		return runnerScriptPath;
+	}
+
+	private static String getSitePackagesPath(
+			String prefix,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", GET_SITE_PACKAGES_PATH_SCRIPT, prefix };
+		String out = execute(commands, environmentVariables, false);
+		return String.join(File.pathSeparator, out.trim().split("\n"));
+	}
+
+	private static boolean isPipVersionGreaterEqual(
+			String pipVersion,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", CHECK_PIP_VERSION_SCRIPT, pipVersion };
+		String out = execute(commands, environmentVariables, false);
+		return Boolean.parseBoolean(out.trim());
+	}
+
+	private static String execute(
+			String[] commands,
+			Map<String, String> environmentVariables,
+			boolean log) throws IOException {
+		ProcessBuilder pb = new ProcessBuilder(commands);
+		pb.environment().putAll(environmentVariables);
+		pb.redirectErrorStream(true);
+		Process p = pb.start();
+		InputStream in = new BufferedInputStream(p.getInputStream());
+		StringBuilder out = new StringBuilder();
+		String s;
+		if (log) {
+			LOG.info(String.format("Execute the commnad: %s", String.join(" ", commands)));

Review comment:
       typo: command

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+	private static final int MAX_RETRY_TIMES = 3;
+
+	private static final String PYFLINK_UDF_RUNNER_SH = "pyflink-udf-runner.sh";
+	private static final String PYFLINK_UDF_RUNNER_BAT = "pyflink-udf-runner.bat";
+
+	private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+		"import sys;" +
+		"from distutils.dist import Distribution;" +
+		"install_obj = Distribution().get_command_obj('install', create=True);" +
+		"install_obj.prefix = sys.argv[1];" +
+		"install_obj.finalize_options();" +
+		"installed_dir = [install_obj.install_purelib];" +
+		"install_obj.install_purelib != install_obj.install_platlib and " +
+			"installed_dir.append(install_obj.install_platlib);" +
+		"print(installed_dir[0]);" +
+		"len(installed_dir) > 1 and " +
+			"print(installed_dir[1])";
+
+	private static final String CHECK_PIP_VERSION_SCRIPT =
+		"import sys;" +
+		"from pkg_resources import get_distribution, parse_version;" +
+		"pip_version = get_distribution('pip').version;" +
+		"print(parse_version(pip_version) >= parse_version(sys.argv[1]))";
+
+	private static final String GET_RUNNER_DIR_SCRIPT =
+		"import pyflink;" +
+		"import os;" +
+		"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))";
+
+	/**
+	 * Installs the 3rd party libraries listed in the user-provided requirements file. An optional
+	 * requirements cached directory can be provided to support offline installation. In order not
+	 * to populate the public environment, the libraries will be installed to the specified
+	 * directory, and added to the PYTHONPATH of the UDF workers.
+	 *
+	 * @param requirementsFilePath The path of the requirements file.
+	 * @param requirementsCacheDir The path of the requirements cached directory.
+	 * @param requirementsInstallDir The target directory of the installation.
+	 * @param pythonExecutable The python interpreter used to launch the pip program.
+	 * @param environmentVariables The environment variables used to launch the pip program.
+	 */
+	public static void pipInstallRequirements(
+			String requirementsFilePath,
+			@Nullable String requirementsCacheDir,
+			String requirementsInstallDir,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String sitePackagesPath = getSitePackagesPath(requirementsInstallDir, pythonExecutable, environmentVariables);
+		String path = String.join(File.pathSeparator, requirementsInstallDir, "bin");
+		appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, environmentVariables);
+		appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+		List<String> commands = new ArrayList<>(Arrays.asList(
+			pythonExecutable, "-m", "pip", "install", "--ignore-installed", "-r", requirementsFilePath));
+		if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, environmentVariables)) {
+			commands.addAll(Arrays.asList("--prefix", requirementsInstallDir));
+		} else {
+			commands.addAll(Arrays.asList("--install-option", "--prefix=" + requirementsInstallDir));
+		}
+		if (requirementsCacheDir != null) {
+			commands.addAll(Arrays.asList("--find-links", requirementsCacheDir));
+		}
+
+		int retries = 0;
+		while (true) {
+			try {
+				execute(commands.toArray(new String[0]), environmentVariables, true);
+				break;
+			} catch (Throwable t) {
+				retries++;
+				if (retries < MAX_RETRY_TIMES) {
+					LOG.warn(String.format("Pip install failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+				} else {
+					LOG.error(String.format("Pip install failed, already retried %d time...", retries));
+					throw t;
+				}
+			}
+		}
+	}
+
+	public static String getRunnerScriptPath(
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", GET_RUNNER_DIR_SCRIPT};
+		String out = execute(commands, environmentVariables, false);
+		String runnerScriptPath;
+		if (OperatingSystem.isWindows()) {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_BAT);
+		} else {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_SH);
+		}
+		if (!new File(runnerScriptPath).exists()) {
+			throw new FileNotFoundException(String.format(
+				"The runner script '%s' does not exist! " +
+				"Please reinstall the apache-flink Python package.", runnerScriptPath));
+		}
+		return runnerScriptPath;
+	}
+
+	private static String getSitePackagesPath(
+			String prefix,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", GET_SITE_PACKAGES_PATH_SCRIPT, prefix };
+		String out = execute(commands, environmentVariables, false);
+		return String.join(File.pathSeparator, out.trim().split("\n"));
+	}
+
+	private static boolean isPipVersionGreaterEqual(
+			String pipVersion,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", CHECK_PIP_VERSION_SCRIPT, pipVersion };
+		String out = execute(commands, environmentVariables, false);
+		return Boolean.parseBoolean(out.trim());
+	}
+
+	private static String execute(
+			String[] commands,
+			Map<String, String> environmentVariables,
+			boolean log) throws IOException {
+		ProcessBuilder pb = new ProcessBuilder(commands);
+		pb.environment().putAll(environmentVariables);
+		pb.redirectErrorStream(true);
+		Process p = pb.start();
+		InputStream in = new BufferedInputStream(p.getInputStream());
+		StringBuilder out = new StringBuilder();
+		String s;
+		if (log) {
+			LOG.info(String.format("Execute the commnad: %s", String.join(" ", commands)));
+		}
+		try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
+			while ((s = br.readLine()) != null) {
+				out.append(s).append("\n");
+				if (log) {
+					LOG.info(s);
+				}
+			}
+		}
+		try {
+			if (p.waitFor() != 0) {
+				throw new IOException(String.format(
+					"Execute the command:\n%s\nfailed:\n%s", String.join(" ", commands), out));
+			}
+		} catch (InterruptedException e) {
+			// Ignored. The subprocess is dead after the "br.readLine()" returns null, so the "waitFor"
+			// method would not be blocked.
+		}
+		return out.toString();
+	}
+
+	private static void appendToEnvironmentVariable(String key, String path, Map<String, String> env) {
+		if (env.containsKey(key)) {
+			env.put(key, String.join(File.pathSeparator, path, env.get(key)));
+		} else {
+			env.put(key, path);
+		}
+	}
+
+	public static void main(String[] args) throws IOException, InterruptedException {

Review comment:
       remove this?

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils used to install the python requirements via pip.
+ */
+@Internal
+public class PythonEnvironmentManagerUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PythonEnvironmentManagerUtils.class);
+
+	private static final int MAX_RETRY_TIMES = 3;
+
+	private static final String PYFLINK_UDF_RUNNER_SH = "pyflink-udf-runner.sh";
+	private static final String PYFLINK_UDF_RUNNER_BAT = "pyflink-udf-runner.bat";
+
+	private static final String GET_SITE_PACKAGES_PATH_SCRIPT =
+		"import sys;" +
+		"from distutils.dist import Distribution;" +
+		"install_obj = Distribution().get_command_obj('install', create=True);" +
+		"install_obj.prefix = sys.argv[1];" +
+		"install_obj.finalize_options();" +
+		"installed_dir = [install_obj.install_purelib];" +
+		"install_obj.install_purelib != install_obj.install_platlib and " +
+			"installed_dir.append(install_obj.install_platlib);" +
+		"print(installed_dir[0]);" +
+		"len(installed_dir) > 1 and " +
+			"print(installed_dir[1])";
+
+	private static final String CHECK_PIP_VERSION_SCRIPT =
+		"import sys;" +
+		"from pkg_resources import get_distribution, parse_version;" +
+		"pip_version = get_distribution('pip').version;" +
+		"print(parse_version(pip_version) >= parse_version(sys.argv[1]))";
+
+	private static final String GET_RUNNER_DIR_SCRIPT =
+		"import pyflink;" +
+		"import os;" +
+		"print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))";
+
+	/**
+	 * Installs the 3rd party libraries listed in the user-provided requirements file. An optional
+	 * requirements cached directory can be provided to support offline installation. In order not
+	 * to populate the public environment, the libraries will be installed to the specified
+	 * directory, and added to the PYTHONPATH of the UDF workers.
+	 *
+	 * @param requirementsFilePath The path of the requirements file.
+	 * @param requirementsCacheDir The path of the requirements cached directory.
+	 * @param requirementsInstallDir The target directory of the installation.
+	 * @param pythonExecutable The python interpreter used to launch the pip program.
+	 * @param environmentVariables The environment variables used to launch the pip program.
+	 */
+	public static void pipInstallRequirements(
+			String requirementsFilePath,
+			@Nullable String requirementsCacheDir,
+			String requirementsInstallDir,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String sitePackagesPath = getSitePackagesPath(requirementsInstallDir, pythonExecutable, environmentVariables);
+		String path = String.join(File.pathSeparator, requirementsInstallDir, "bin");
+		appendToEnvironmentVariable("PYTHONPATH", sitePackagesPath, environmentVariables);
+		appendToEnvironmentVariable("PATH", path, environmentVariables);
+
+		List<String> commands = new ArrayList<>(Arrays.asList(
+			pythonExecutable, "-m", "pip", "install", "--ignore-installed", "-r", requirementsFilePath));
+		if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, environmentVariables)) {
+			commands.addAll(Arrays.asList("--prefix", requirementsInstallDir));
+		} else {
+			commands.addAll(Arrays.asList("--install-option", "--prefix=" + requirementsInstallDir));
+		}
+		if (requirementsCacheDir != null) {
+			commands.addAll(Arrays.asList("--find-links", requirementsCacheDir));
+		}
+
+		int retries = 0;
+		while (true) {
+			try {
+				execute(commands.toArray(new String[0]), environmentVariables, true);
+				break;
+			} catch (Throwable t) {
+				retries++;
+				if (retries < MAX_RETRY_TIMES) {
+					LOG.warn(String.format("Pip install failed, retrying... (%d/%d)", retries, MAX_RETRY_TIMES), t);
+				} else {
+					LOG.error(String.format("Pip install failed, already retried %d time...", retries));
+					throw t;
+				}
+			}
+		}
+	}
+
+	public static String getRunnerScriptPath(
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", GET_RUNNER_DIR_SCRIPT};
+		String out = execute(commands, environmentVariables, false);
+		String runnerScriptPath;
+		if (OperatingSystem.isWindows()) {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_BAT);
+		} else {
+			runnerScriptPath = String.join(File.separator, out.trim(), PYFLINK_UDF_RUNNER_SH);
+		}
+		if (!new File(runnerScriptPath).exists()) {
+			throw new FileNotFoundException(String.format(
+				"The runner script '%s' does not exist! " +
+				"Please reinstall the apache-flink Python package.", runnerScriptPath));
+		}
+		return runnerScriptPath;
+	}
+
+	private static String getSitePackagesPath(
+			String prefix,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", GET_SITE_PACKAGES_PATH_SCRIPT, prefix };
+		String out = execute(commands, environmentVariables, false);
+		return String.join(File.pathSeparator, out.trim().split("\n"));
+	}
+
+	private static boolean isPipVersionGreaterEqual(
+			String pipVersion,
+			String pythonExecutable,
+			Map<String, String> environmentVariables) throws IOException, InterruptedException {
+		String[] commands = new String[] { pythonExecutable, "-c", CHECK_PIP_VERSION_SCRIPT, pipVersion };
+		String out = execute(commands, environmentVariables, false);
+		return Boolean.parseBoolean(out.trim());
+	}
+
+	private static String execute(
+			String[] commands,
+			Map<String, String> environmentVariables,
+			boolean log) throws IOException {
+		ProcessBuilder pb = new ProcessBuilder(commands);
+		pb.environment().putAll(environmentVariables);
+		pb.redirectErrorStream(true);
+		Process p = pb.start();
+		InputStream in = new BufferedInputStream(p.getInputStream());
+		StringBuilder out = new StringBuilder();
+		String s;
+		if (log) {
+			LOG.info(String.format("Execute the commnad: %s", String.join(" ", commands)));

Review comment:
       `Executing command: %s`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org