You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/13 09:04:35 UTC

[GitHub] [flink] hequn8128 commented on a change in pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

hequn8128 commented on a change in pull request #13136:
URL: https://github.com/apache/flink/pull/13136#discussion_r469776437



##########
File path: flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java
##########
@@ -75,6 +75,8 @@
 
 	protected transient StreamRecordCollector streamRecordCollector;
 
+	private Configuration mergedEnvConfig;

Review comment:
       We don't need this config. Use the `PythonConfig` in `DataStreamPythonStatelessFunctionOperator` directly. 

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str
+        """
+        jvm = get_gateway().jvm
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil\
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_files = env_config.getString(jvm.PythonOptions.PYTHON_FILES.key(), None)
+        if python_files is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join([python_files, file_path])
+        else:
+            python_files = file_path
+        env_config.setString(jvm.PythonOptions.PYTHON_FILES.key(), python_files)
+
+    def set_python_requirements(self, requirements_file_path: str,
+                                requirements_cache_dir: str = None):
+        """
+        Specifies a requirements.txt file which defines the third-party dependencies.
+        These dependencies will be installed to a temporary directory and added to the
+        PYTHONPATH of the python UDF worker.
+
+        For the dependencies which could not be accessed in the cluster, a directory which contains
+        the installation packages of these dependencies could be specified using the parameter
+        "requirements_cached_dir". It will be uploaded to the cluster to support offline
+        installation.
+
+        Example:
+        ::
+
+            # commands executed in shell
+            $ echo numpy==1.16.5 > requirements.txt
+            $ pip download -d cached_dir -r requirements.txt --no-binary :all:
+
+            # python code
+            >>> stream_env.set_python_requirements("requirements.txt", "cached_dir")
+
+        .. note::
+
+            Please make sure the installation packages matches the platform of the cluster
+            and the python version used. These packages will be installed using pip,
+            so also make sure the version of Pip (version >= 7.1.0) and the version of
+            SetupTools (version >= 37.0.0).
+
+        :param requirements_file_path: The path of "requirements.txt" file.
+        :type requirements_file_path: str
+        :param requirements_cache_dir: The path of the local directory which contains the
+                                       installation packages.
+        :type requirements_cache_dir: str
+        """
+        jvm = get_gateway().jvm
+        python_requirements = requirements_file_path
+        if requirements_cache_dir is not None:
+            python_requirements = jvm.PythonDependencyUtils.PARAM_DELIMITER.join(
+                [python_requirements, requirements_cache_dir])
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        env_config.setString(jvm.PythonOptions.PYTHON_REQUIREMENTS.key(), python_requirements)
+
+    def add_python_archive(self, archive_path: str, target_dir: str = None):
+        """
+        Adds a python archive file. The file will be extracted to the working directory of
+        python UDF worker.
+
+        If the parameter "target_dir" is specified, the archive file will be extracted to a
+        directory named ${target_dir}. Otherwise, the archive file will be extracted to a
+        directory with the same name of the archive file.
+
+        If python UDF depends on a specific python version which does not exist in the cluster,
+        this method can be used to upload the virtual environment.
+        Note that the path of the python interpreter contained in the uploaded environment
+        should be specified via the method :func:`pyflink.table.TableConfig.set_python_executable`.
+
+        The files uploaded via this method are also accessible in UDFs via relative path.
+
+        Example:
+        ::
+
+            # command executed in shell
+            # assert the relative path of python interpreter is py_env/bin/python
+            $ zip -r py_env.zip py_env
+
+            # python code
+            >>> stream_env.add_python_archive("py_env.zip")
+            >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python")
+
+            # or
+            >>> stream_env.add_python_archive("py_env.zip", "myenv")
+            >>> stream_env.set_python_executable("myenv/py_env/bin/python")
+
+            # the files contained in the archive file can be accessed in UDF
+            >>> def my_udf():
+            ...     with open("myenv/py_env/data/data.txt") as f:
+            ...         ...
+
+        .. note::
+
+            Please make sure the uploaded python environment matches the platform that the cluster
+            is running on and that the python version must be 3.5 or higher.
+
+        .. note::
+
+            Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc.
+            The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
+
+        :param archive_path: The archive file path.
+        :type archive_path: str
+        :param target_dir: Optional, the target dir name that the archive file extracted to.
+        :type target_dir: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -64,4 +82,103 @@ public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env)
 		Configuration envConfiguration = (Configuration) getConfigurationMethod.invoke(env);
 		return envConfiguration;
 	}
+
+	/**
+	 * Configure the {@link DataStreamPythonStatelessFunctionOperator} to be chained with the upstream/downstream
+	 * operator by setting their parallelism, slot sharing group, co-location group to be the same, and applying a
+	 * {@link ForwardPartitioner}.
+	 * 1. operator with name "_keyed_stream_values_operator" should align with its downstream operator.
+	 * 2. operator with name "_stream_key_by_map_operator" should align with its upstream operator.
+	 */
+	private static void alignStreamNode(StreamNode streamNode, StreamGraph streamGraph) {
+		if (streamNode.getOperatorName().equals(KEYED_STREAM_VALUE_OPERATOR_NAME)) {
+			StreamEdge downStreamEdge = streamNode.getOutEdges().get(0);
+			StreamNode downStreamNode = streamGraph.getStreamNode(downStreamEdge.getTargetId());
+			downStreamEdge.setPartitioner(new ForwardPartitioner());
+			streamNode.setParallelism(downStreamNode.getParallelism());
+			streamNode.setCoLocationGroup(downStreamNode.getCoLocationGroup());
+			streamNode.setSlotSharingGroup(downStreamNode.getSlotSharingGroup());
+		}
+
+		if (streamNode.getOperatorName().equals(STREAM_KEY_BY_MAP_OPERATOR_NAME)) {
+			StreamEdge upStreamEdge = streamNode.getInEdges().get(0);
+			StreamNode upStreamNode = streamGraph.getStreamNode(upStreamEdge.getSourceId());
+			upStreamEdge.setPartitioner(new ForwardPartitioner<>());
+			streamNode.setParallelism(upStreamNode.getParallelism());
+			streamNode.setSlotSharingGroup(upStreamNode.getSlotSharingGroup());
+			streamNode.setCoLocationGroup(upStreamNode.getCoLocationGroup());

Review comment:
       Another method can be extracted to avoid code duplication.

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str
+        """
+        jvm = get_gateway().jvm
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil\
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_files = env_config.getString(jvm.PythonOptions.PYTHON_FILES.key(), None)
+        if python_files is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join([python_files, file_path])
+        else:
+            python_files = file_path
+        env_config.setString(jvm.PythonOptions.PYTHON_FILES.key(), python_files)
+
+    def set_python_requirements(self, requirements_file_path: str,
+                                requirements_cache_dir: str = None):
+        """
+        Specifies a requirements.txt file which defines the third-party dependencies.
+        These dependencies will be installed to a temporary directory and added to the
+        PYTHONPATH of the python UDF worker.
+
+        For the dependencies which could not be accessed in the cluster, a directory which contains
+        the installation packages of these dependencies could be specified using the parameter
+        "requirements_cached_dir". It will be uploaded to the cluster to support offline
+        installation.
+
+        Example:
+        ::
+
+            # commands executed in shell
+            $ echo numpy==1.16.5 > requirements.txt
+            $ pip download -d cached_dir -r requirements.txt --no-binary :all:
+
+            # python code
+            >>> stream_env.set_python_requirements("requirements.txt", "cached_dir")
+
+        .. note::
+
+            Please make sure the installation packages matches the platform of the cluster
+            and the python version used. These packages will be installed using pip,
+            so also make sure the version of Pip (version >= 7.1.0) and the version of
+            SetupTools (version >= 37.0.0).
+
+        :param requirements_file_path: The path of "requirements.txt" file.
+        :type requirements_file_path: str
+        :param requirements_cache_dir: The path of the local directory which contains the
+                                       installation packages.
+        :type requirements_cache_dir: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -64,4 +82,103 @@ public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env)
 		Configuration envConfiguration = (Configuration) getConfigurationMethod.invoke(env);
 		return envConfiguration;
 	}
+
+	/**
+	 * Configure the {@link DataStreamPythonStatelessFunctionOperator} to be chained with the upstream/downstream
+	 * operator by setting their parallelism, slot sharing group, co-location group to be the same, and applying a
+	 * {@link ForwardPartitioner}.
+	 * 1. operator with name "_keyed_stream_values_operator" should align with its downstream operator.
+	 * 2. operator with name "_stream_key_by_map_operator" should align with its upstream operator.
+	 */
+	private static void alignStreamNode(StreamNode streamNode, StreamGraph streamGraph) {
+		if (streamNode.getOperatorName().equals(KEYED_STREAM_VALUE_OPERATOR_NAME)) {
+			StreamEdge downStreamEdge = streamNode.getOutEdges().get(0);
+			StreamNode downStreamNode = streamGraph.getStreamNode(downStreamEdge.getTargetId());
+			downStreamEdge.setPartitioner(new ForwardPartitioner());
+			streamNode.setParallelism(downStreamNode.getParallelism());
+			streamNode.setCoLocationGroup(downStreamNode.getCoLocationGroup());
+			streamNode.setSlotSharingGroup(downStreamNode.getSlotSharingGroup());
+		}
+
+		if (streamNode.getOperatorName().equals(STREAM_KEY_BY_MAP_OPERATOR_NAME)) {
+			StreamEdge upStreamEdge = streamNode.getInEdges().get(0);
+			StreamNode upStreamNode = streamGraph.getStreamNode(upStreamEdge.getSourceId());
+			upStreamEdge.setPartitioner(new ForwardPartitioner<>());
+			streamNode.setParallelism(upStreamNode.getParallelism());
+			streamNode.setSlotSharingGroup(upStreamNode.getSlotSharingGroup());
+			streamNode.setCoLocationGroup(upStreamNode.getCoLocationGroup());
+		}
+	}
+
+	/**
+	 * Generate a {@link StreamGraph} for transformations maintained by current {@link StreamExecutionEnvironment}, and
+	 * reset the merged env configurations with dependencies to every {@link DataStreamPythonStatelessFunctionOperator}.
+	 * It is an idempotent operation that can be call multiple times. Remember that only when need to execute the
+	 * StreamGraph can we set the clearTransformations to be True.
+	 */
+	public static StreamGraph generateStreamGraphWithDependencies(
+		StreamExecutionEnvironment env, boolean clearTransformations) throws IllegalAccessException,
+		NoSuchMethodException, InvocationTargetException {
+
+		Configuration mergedConfig = getEnvConfigWithDependencies(env);
+		StreamGraph streamGraph = env.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, clearTransformations);
+		Collection<StreamNode> streamNodes = streamGraph.getStreamNodes();
+		for (StreamNode streamNode : streamNodes) {
+
+			alignStreamNode(streamNode, streamGraph);
+
+			StreamOperatorFactory streamOperatorFactory = streamNode.getOperatorFactory();
+			if (streamOperatorFactory instanceof SimpleOperatorFactory) {
+				StreamOperator streamOperator = ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
+				if (streamOperator instanceof DataStreamPythonStatelessFunctionOperator) {
+					DataStreamPythonStatelessFunctionOperator dataStreamPythonStatelessFunctionOperator =
+						(DataStreamPythonStatelessFunctionOperator) streamOperator;
+					Configuration oldConfig = dataStreamPythonStatelessFunctionOperator.getMergedEnvConfig();
+					dataStreamPythonStatelessFunctionOperator.setPythonConfig(generateNewPythonConfig(oldConfig,
+						mergedConfig));
+				}
+			}
+		}
+		return streamGraph;
+	}
+
+	/**
+	 * Generator a new {@link  PythonConfig} with the combined config which is derived from oldConfig.
+	 */
+	private static PythonConfig generateNewPythonConfig(Configuration oldConfig, Configuration newConfig) {
+		setIfNotExist(PythonOptions.MAX_BUNDLE_SIZE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.MAX_BUNDLE_TIME_MILLS, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.MAX_BUNDLE_TIME_MILLS, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_EXECUTABLE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_METRIC_ENABLED, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.USE_MANAGED_MEMORY, oldConfig, newConfig);
+
+		combineConfigValue(PythonDependencyUtils.PYTHON_FILES, oldConfig, newConfig);
+		combineConfigValue(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE, oldConfig, newConfig);
+		combineConfigValue(PythonDependencyUtils.PYTHON_ARCHIVES, oldConfig, newConfig);
+
+		return new PythonConfig(oldConfig);
+	}
+
+	/**
+	 * Make sure new configuration not overriding the previously configured value. For example, the MAX_BUNDLE_SIZE of
+	 * {@link org.apache.flink.datastream.runtime.operators.python.DataStreamPythonReduceFunctionOperator} is
+	 * pre-configured to be 1, we must not to change it.
+	 */
+	private static void setIfNotExist(ConfigOption configOption, Configuration oldConfig, Configuration newConfig) {
+		if (!oldConfig.containsKey(configOption.key())) {
+			oldConfig.set(configOption, newConfig.get(configOption));
+		}
+	}
+
+	/**
+	 * Dependency file information maintained by a Map in old config can be combined with new config.
+	 */
+	private static void combineConfigValue(ConfigOption<Map<String, String>> configOption, Configuration oldConfig, Configuration newConfig) {
+		Map<String, String> oldConfigValue = oldConfig.getOptional(configOption).orElse(new HashMap<>());
+		oldConfigValue.putAll(newConfig.getOptional(configOption).orElse(new HashMap<>()));
+		oldConfig.set(configOption, oldConfigValue);
+	}

Review comment:
       These methods can be replaced by:
   ```
   	private static PythonConfig generateNewPythonConfig(Configuration oldConfig, Configuration newConfig) {
   		newConfig.clone().addAll(oldConfig);
   		return new PythonConfig(oldConfig);
   	}
   ```

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -416,10 +584,14 @@ def execute(self, job_name=None):
         :param job_name: Desired name of the job, optional.
         :return: The result of the job execution, containing elapsed time and accumulators.
         """
-        if job_name is None:
-            return JobExecutionResult(self._j_stream_execution_environment.execute())
-        else:
-            return JobExecutionResult(self._j_stream_execution_environment.execute(job_name))
+        j_stream_graph = get_gateway().jvm \
+            .org.apache.flink.python.util.PythonConfigUtil.generateStreamGraphWithDependencies(
+            self._j_stream_execution_environment, True)

Review comment:
       We can extract a method and reuse it in different methods.

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str
+        """
+        jvm = get_gateway().jvm
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil\
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_files = env_config.getString(jvm.PythonOptions.PYTHON_FILES.key(), None)
+        if python_files is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join([python_files, file_path])
+        else:
+            python_files = file_path
+        env_config.setString(jvm.PythonOptions.PYTHON_FILES.key(), python_files)
+
+    def set_python_requirements(self, requirements_file_path: str,
+                                requirements_cache_dir: str = None):
+        """
+        Specifies a requirements.txt file which defines the third-party dependencies.
+        These dependencies will be installed to a temporary directory and added to the
+        PYTHONPATH of the python UDF worker.
+
+        For the dependencies which could not be accessed in the cluster, a directory which contains
+        the installation packages of these dependencies could be specified using the parameter
+        "requirements_cached_dir". It will be uploaded to the cluster to support offline
+        installation.
+
+        Example:
+        ::
+
+            # commands executed in shell
+            $ echo numpy==1.16.5 > requirements.txt
+            $ pip download -d cached_dir -r requirements.txt --no-binary :all:
+
+            # python code
+            >>> stream_env.set_python_requirements("requirements.txt", "cached_dir")
+
+        .. note::
+
+            Please make sure the installation packages matches the platform of the cluster
+            and the python version used. These packages will be installed using pip,
+            so also make sure the version of Pip (version >= 7.1.0) and the version of
+            SetupTools (version >= 37.0.0).
+
+        :param requirements_file_path: The path of "requirements.txt" file.
+        :type requirements_file_path: str
+        :param requirements_cache_dir: The path of the local directory which contains the
+                                       installation packages.
+        :type requirements_cache_dir: str
+        """
+        jvm = get_gateway().jvm
+        python_requirements = requirements_file_path
+        if requirements_cache_dir is not None:
+            python_requirements = jvm.PythonDependencyUtils.PARAM_DELIMITER.join(
+                [python_requirements, requirements_cache_dir])
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        env_config.setString(jvm.PythonOptions.PYTHON_REQUIREMENTS.key(), python_requirements)
+
+    def add_python_archive(self, archive_path: str, target_dir: str = None):
+        """
+        Adds a python archive file. The file will be extracted to the working directory of
+        python UDF worker.
+
+        If the parameter "target_dir" is specified, the archive file will be extracted to a
+        directory named ${target_dir}. Otherwise, the archive file will be extracted to a
+        directory with the same name of the archive file.
+
+        If python UDF depends on a specific python version which does not exist in the cluster,
+        this method can be used to upload the virtual environment.
+        Note that the path of the python interpreter contained in the uploaded environment
+        should be specified via the method :func:`pyflink.table.TableConfig.set_python_executable`.
+
+        The files uploaded via this method are also accessible in UDFs via relative path.
+
+        Example:
+        ::
+
+            # command executed in shell
+            # assert the relative path of python interpreter is py_env/bin/python
+            $ zip -r py_env.zip py_env
+
+            # python code
+            >>> stream_env.add_python_archive("py_env.zip")
+            >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python")
+
+            # or
+            >>> stream_env.add_python_archive("py_env.zip", "myenv")
+            >>> stream_env.set_python_executable("myenv/py_env/bin/python")
+
+            # the files contained in the archive file can be accessed in UDF
+            >>> def my_udf():
+            ...     with open("myenv/py_env/data/data.txt") as f:
+            ...         ...
+
+        .. note::
+
+            Please make sure the uploaded python environment matches the platform that the cluster
+            is running on and that the python version must be 3.5 or higher.
+
+        .. note::
+
+            Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc.
+            The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
+
+        :param archive_path: The archive file path.
+        :type archive_path: str
+        :param target_dir: Optional, the target dir name that the archive file extracted to.
+        :type target_dir: str
+        """
+        jvm = get_gateway().jvm
+        if target_dir is not None:
+            archive_path = jvm.PythonDependencyUtils.PARAM_DELIMITER.join(
+                [archive_path, target_dir])
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_archives = env_config.getString(jvm.PythonOptions.PYTHON_ARCHIVES.key(), None)
+        if python_archives is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join(
+                [python_archives, archive_path])
+        else:
+            python_files = archive_path
+        env_config.setString(jvm.PythonOptions.PYTHON_ARCHIVES.key(), python_files)
+
+    def set_python_executable(self, python_exec: str):
+        """
+        Sets the path of the python interpreter which is used to execute the python udf workers.
+
+        e.g. "/usr/local/bin/python3".
+
+        If python UDF depends on a specific python version which does not exist in the cluster,
+        the method :func:`pyflink.datastream.StreamExecutionEnvironment.add_python_archive` can be
+        used to upload a virtual environment. The path of the python interpreter contained in the
+        uploaded environment can be specified via this method.
+
+        Example:
+        ::
+
+            # command executed in shell
+            # assume that the relative path of python interpreter is py_env/bin/python
+            $ zip -r py_env.zip py_env
+
+            # python code
+            >>> stream_env.add_python_archive("py_env.zip")
+            >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python")
+
+        .. note::
+
+            Please make sure the uploaded python environment matches the platform that the cluster
+            is running on and that the python version must be 3.5 or higher.
+
+        .. note::
+
+            The python udf worker depends on Apache Beam (version == 2.19.0).
+            Please ensure that the specified environment meets the above requirements.
+
+        :param python_exec: The path of python interpreter.
+        :type python_exec: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -438,7 +440,7 @@ def _get_java_python_function_operator(self, func: Union[Function, FunctionWrapp
 
         j_env = self._j_data_stream.getExecutionEnvironment()
         PythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
-        j_conf = PythonConfigUtil.getMergedConfig(j_env)
+        j_conf = PythonConfigUtil.getEnvConfigWithDependencies(j_env)

Review comment:
       We need to return an empty config here. 

##########
File path: flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
##########
@@ -280,5 +283,222 @@ def test_execute_async(self):
         execution_result = job_client.get_job_execution_result().result()
         self.assertEqual(str(job_id), str(execution_result.get_job_id()))
 
+    def test_add_python_file(self):
+        import uuid
+        python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
+        os.mkdir(python_file_dir)
+        python_file_path = os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
+        with open(python_file_path, 'w') as f:
+            f.write("def add_two(a):\n    return a + 2")
+
+        def plus_two_map(value):
+            from test_stream_dependency_manage_lib import add_two
+            return add_two(value)
+
+        self.env.add_python_file(python_file_path)
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(plus_two_map).add_sink(self.test_sink)
+        self.env.execute("test add python file")
+        result = self.test_sink.get_results(True)
+        expected = ['3', '4', '5', '6', '7']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_set_requirements_without_cached_directory(self):
+        import uuid
+        requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
+        with open(requirements_txt_path, 'w') as f:
+            f.write("cloudpickle==1.2.2")
+        self.env.set_python_requirements(requirements_txt_path)
+
+        def check_requirements(i):
+            import cloudpickle
+            assert os.path.abspath(cloudpickle.__file__).startswith(
+                os.environ['_PYTHON_REQUIREMENTS_INSTALL_DIR'])
+            return i
+
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(check_requirements).add_sink(self.test_sink)
+        self.env.execute("test set requirements without cache dir")
+        result = self.test_sink.get_results(True)
+        expected = ['1', '2', '3', '4', '5']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_set_requirements_with_cached_directory(self):
+        import uuid
+        tmp_dir = self.tempdir
+        requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4()))
+        with open(requirements_txt_path, 'w') as f:
+            f.write("python-package1==0.0.0")
+
+        requirements_dir_path = os.path.join(tmp_dir, "requirements_dir_" + str(uuid.uuid4()))
+        os.mkdir(requirements_dir_path)
+        package_file_name = "python-package1-0.0.0.tar.gz"
+        with open(os.path.join(requirements_dir_path, package_file_name), 'wb') as f:
+            import base64
+            # This base64 data is encoded from a python package file which includes a
+            # "python_package1" module. The module contains a "plus(a, b)" function.
+            # The base64 can be recomputed by following code:
+            # base64.b64encode(open("python-package1-0.0.0.tar.gz", "rb").read()).decode("utf-8")
+            f.write(base64.b64decode(
+                "H4sICNefrV0C/2Rpc3QvcHl0aG9uLXBhY2thZ2UxLTAuMC4wLnRhcgDtmVtv2jAYhnPtX2H1CrRCY+ckI"
+                "XEx7axuUA11u5imyICTRc1JiVnHfv1MKKWjYxwKEdPehws7xkmUfH5f+3PyqfqWpa1cjG5EKFnLbOvfhX"
+                "FQTI3nOPPSdavS5Pa8nGMwy3Esi3ke9wyTObbnGNQxamBSKlFQavzUryG8ldG6frpbEGx4yNmDLMp/hPy"
+                "P8b+6fNN613vdP1z8XdteG3+ug/17/F3Hcw1qIv5H54NUYiyUaH2SRRllaYeytkl6IpEdujI2yH2XapCQ"
+                "wSRJRDHt0OveZa//uUfeZonUvUO5bHo+0ZcoVo9bMhFRvGx9H41kWj447aUsR0WUq+pui8arWKggK5Jli"
+                "wGOo/95q79ovXi6/nfyf246Dof/n078fT9KI+X77Xx6BP83bX4Xf5NxT7dz7toO/L8OxjKgeTwpG+KcDp"
+                "sdQjWFVJMipYI+o0MCk4X/t2UYtqI0yPabCHb3f861XcD/Ty/+Y5nLdCzT0dSPo/SmbKsf6un+b7KV+Ls"
+                "W4/D/OoC9w/930P9eGwM75//csrD+Q/6P/P/k9D/oX3988Wqw1bS/tf6tR+s/m3EG/ddBqXO9XKf15C8p"
+                "P9k4HZBtBgzZaVW5vrfKcj+W32W82ygEB9D/Xu9+4/qfP9L/rBv0X1v87yONKRX61/qfzwqjIDzIPTbv/"
+                "7or3/88i0H/tfBFW7s/s/avRInQH06ieEy7tDrQeYHUdRN7wP+n/vf62LOH/pld7f9xz7a5Pfufedy0oP"
+                "86iJI8KxStAq6yLC4JWdbbVbWRikR2z1ZGytk5vauW3QdnBFE6XqwmykazCesAAAAAAAAAAAAAAAAAAAA"
+                "AAAAAAAAAAAAAAOBw/AJw5CHBAFAAAA=="))
+        self.env.set_python_requirements(requirements_txt_path, requirements_dir_path)
+
+        def add_one(i):
+            from python_package1 import plus
+            return plus(i, 1)
+
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(add_one).add_sink(self.test_sink)
+        self.env.execute("test set requirements with cachd dir")
+        result = self.test_sink.get_results(True)
+        expected = ['2', '3', '4', '5', '6']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_add_python_archive(self):
+        import uuid
+        import shutil
+        tmp_dir = self.tempdir
+        archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
+        os.mkdir(archive_dir_path)
+        with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
+            f.write("2")
+        archive_file_path = \
+            shutil.make_archive(os.path.dirname(archive_dir_path), 'zip', archive_dir_path)
+        self.env.add_python_archive(archive_file_path, "data")
+
+        def add_from_file(i):
+            with open("data/data.txt", 'r') as f:
+                return i + int(f.read())
+
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(add_from_file).add_sink(self.test_sink)
+        self.env.execute("test set python archive")
+        result = self.test_sink.get_results(True)
+        expected = ['3', '4', '5', '6', '7']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_set_stream_env(self):
+        import sys
+        python_exec = sys.executable
+        tmp_dir = self.tempdir
+        python_exec_link_path = os.path.join(tmp_dir, "py_exec")
+        os.symlink(python_exec, python_exec_link_path)
+        self.env.set_python_executable(python_exec_link_path)
+
+        def check_python_exec(i):
+            import os
+            assert os.environ["python"] == python_exec_link_path
+            return i
+
+        def check_pyflink_gateway_disabled(i):
+            try:
+                from pyflink.java_gateway import get_gateway
+                get_gateway()
+            except Exception as e:
+                assert str(e).startswith("It's launching the PythonGatewayServer during Python UDF"
+                                         " execution which is unexpected.")
+            else:
+                raise Exception("The gateway server is not disabled!")
+            return i

Review comment:
       Duplicated check since it has been tested by other tests. 

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -240,6 +240,14 @@ public void processWatermark(Watermark mark) throws Exception {
 		}
 	}
 
+	/**
+	 * Reset the {@link PythonConfig} if needed.
+	 * */
+	@Internal
+	public void setPythonConfig(PythonConfig pythonConfig) {

Review comment:
       Put the set method close to the corresponding get method. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org