You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/13 07:13:24 UTC

[GitHub] [flink] shuiqiangchen opened a new pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

shuiqiangchen opened a new pull request #13136:
URL: https://github.com/apache/flink/pull/13136


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Add dependency management for StreamExecutionEnvironment when Users need to specified third party dependencies in their DataStream Job. We purpose to add the following interfaces forPython StreamExecutionEnvironment:
   - add_python_file()
   - set_python_requirements()
   - add_python_archive()
   - set_python_executable()
   
   And user might apply those interfaces arbitrarily, we must configure those dependencies to operators that depend on them correctly before executing the job.
   
   Further more, some operation might consist of multiple operations,  take key_by() for instance:
   
   ```
   >>> source_stream = env.from_collection(...)
   >>> keyed_stream = upstream.key_by(...)
   >>> mapped_stream = keyed_stream.map(...)
   ```
   
   The implementation of key_by() is consist of a _stream_key_by_map operator and a _keyed_stream_values_map operator.
   we need to make sure the _stream_key_by_map operator is chained with the source_stream operator,   the _keyed_stream_values_map operator chained with mapped_stream operator that they have the same parallelism, slot sharing group, co-location group and the ship_strategy should be FORWARD.
   
   All this task should be accomplished before submitting the job by a PythonConfigUtil.
   
   
   ## Brief change log
   
   Add the following interfaces forPython StreamExecutionEnvironment:
   - add_python_file()
   - set_python_requirements()
   - add_python_archive()
   - set_python_executable()
   
   Add util method getEnvConfigWithDependencies() to get StreamExecutionEnvironment configuration with dependencies.
   Add a util method generateStreamGraphWithDependencies() to generate a StreamGraph with all operators have dependencies configured.
   
   ## Verifying this change
   
   This change has test cases covered as followed:
   - test_add_python_file()
   - test_set_requirements_without_cached_directory()
   - test_set_requirements_with_cached_directory()
   - test_add_python_archive()
   - test_set_stream_env()
   - test_generate_stream_graph_with_dependencies()
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): ( no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? ( not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on a change in pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #13136:
URL: https://github.com/apache/flink/pull/13136#discussion_r469776437



##########
File path: flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java
##########
@@ -75,6 +75,8 @@
 
 	protected transient StreamRecordCollector streamRecordCollector;
 
+	private Configuration mergedEnvConfig;

Review comment:
       We don't need this config. Use the `PythonConfig` in `DataStreamPythonStatelessFunctionOperator` directly. 

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str
+        """
+        jvm = get_gateway().jvm
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil\
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_files = env_config.getString(jvm.PythonOptions.PYTHON_FILES.key(), None)
+        if python_files is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join([python_files, file_path])
+        else:
+            python_files = file_path
+        env_config.setString(jvm.PythonOptions.PYTHON_FILES.key(), python_files)
+
+    def set_python_requirements(self, requirements_file_path: str,
+                                requirements_cache_dir: str = None):
+        """
+        Specifies a requirements.txt file which defines the third-party dependencies.
+        These dependencies will be installed to a temporary directory and added to the
+        PYTHONPATH of the python UDF worker.
+
+        For the dependencies which could not be accessed in the cluster, a directory which contains
+        the installation packages of these dependencies could be specified using the parameter
+        "requirements_cached_dir". It will be uploaded to the cluster to support offline
+        installation.
+
+        Example:
+        ::
+
+            # commands executed in shell
+            $ echo numpy==1.16.5 > requirements.txt
+            $ pip download -d cached_dir -r requirements.txt --no-binary :all:
+
+            # python code
+            >>> stream_env.set_python_requirements("requirements.txt", "cached_dir")
+
+        .. note::
+
+            Please make sure the installation packages matches the platform of the cluster
+            and the python version used. These packages will be installed using pip,
+            so also make sure the version of Pip (version >= 7.1.0) and the version of
+            SetupTools (version >= 37.0.0).
+
+        :param requirements_file_path: The path of "requirements.txt" file.
+        :type requirements_file_path: str
+        :param requirements_cache_dir: The path of the local directory which contains the
+                                       installation packages.
+        :type requirements_cache_dir: str
+        """
+        jvm = get_gateway().jvm
+        python_requirements = requirements_file_path
+        if requirements_cache_dir is not None:
+            python_requirements = jvm.PythonDependencyUtils.PARAM_DELIMITER.join(
+                [python_requirements, requirements_cache_dir])
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        env_config.setString(jvm.PythonOptions.PYTHON_REQUIREMENTS.key(), python_requirements)
+
+    def add_python_archive(self, archive_path: str, target_dir: str = None):
+        """
+        Adds a python archive file. The file will be extracted to the working directory of
+        python UDF worker.
+
+        If the parameter "target_dir" is specified, the archive file will be extracted to a
+        directory named ${target_dir}. Otherwise, the archive file will be extracted to a
+        directory with the same name of the archive file.
+
+        If python UDF depends on a specific python version which does not exist in the cluster,
+        this method can be used to upload the virtual environment.
+        Note that the path of the python interpreter contained in the uploaded environment
+        should be specified via the method :func:`pyflink.table.TableConfig.set_python_executable`.
+
+        The files uploaded via this method are also accessible in UDFs via relative path.
+
+        Example:
+        ::
+
+            # command executed in shell
+            # assert the relative path of python interpreter is py_env/bin/python
+            $ zip -r py_env.zip py_env
+
+            # python code
+            >>> stream_env.add_python_archive("py_env.zip")
+            >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python")
+
+            # or
+            >>> stream_env.add_python_archive("py_env.zip", "myenv")
+            >>> stream_env.set_python_executable("myenv/py_env/bin/python")
+
+            # the files contained in the archive file can be accessed in UDF
+            >>> def my_udf():
+            ...     with open("myenv/py_env/data/data.txt") as f:
+            ...         ...
+
+        .. note::
+
+            Please make sure the uploaded python environment matches the platform that the cluster
+            is running on and that the python version must be 3.5 or higher.
+
+        .. note::
+
+            Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc.
+            The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
+
+        :param archive_path: The archive file path.
+        :type archive_path: str
+        :param target_dir: Optional, the target dir name that the archive file extracted to.
+        :type target_dir: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -64,4 +82,103 @@ public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env)
 		Configuration envConfiguration = (Configuration) getConfigurationMethod.invoke(env);
 		return envConfiguration;
 	}
+
+	/**
+	 * Configure the {@link DataStreamPythonStatelessFunctionOperator} to be chained with the upstream/downstream
+	 * operator by setting their parallelism, slot sharing group, co-location group to be the same, and applying a
+	 * {@link ForwardPartitioner}.
+	 * 1. operator with name "_keyed_stream_values_operator" should align with its downstream operator.
+	 * 2. operator with name "_stream_key_by_map_operator" should align with its upstream operator.
+	 */
+	private static void alignStreamNode(StreamNode streamNode, StreamGraph streamGraph) {
+		if (streamNode.getOperatorName().equals(KEYED_STREAM_VALUE_OPERATOR_NAME)) {
+			StreamEdge downStreamEdge = streamNode.getOutEdges().get(0);
+			StreamNode downStreamNode = streamGraph.getStreamNode(downStreamEdge.getTargetId());
+			downStreamEdge.setPartitioner(new ForwardPartitioner());
+			streamNode.setParallelism(downStreamNode.getParallelism());
+			streamNode.setCoLocationGroup(downStreamNode.getCoLocationGroup());
+			streamNode.setSlotSharingGroup(downStreamNode.getSlotSharingGroup());
+		}
+
+		if (streamNode.getOperatorName().equals(STREAM_KEY_BY_MAP_OPERATOR_NAME)) {
+			StreamEdge upStreamEdge = streamNode.getInEdges().get(0);
+			StreamNode upStreamNode = streamGraph.getStreamNode(upStreamEdge.getSourceId());
+			upStreamEdge.setPartitioner(new ForwardPartitioner<>());
+			streamNode.setParallelism(upStreamNode.getParallelism());
+			streamNode.setSlotSharingGroup(upStreamNode.getSlotSharingGroup());
+			streamNode.setCoLocationGroup(upStreamNode.getCoLocationGroup());

Review comment:
       Another method can be extracted to avoid code duplication.

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str
+        """
+        jvm = get_gateway().jvm
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil\
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_files = env_config.getString(jvm.PythonOptions.PYTHON_FILES.key(), None)
+        if python_files is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join([python_files, file_path])
+        else:
+            python_files = file_path
+        env_config.setString(jvm.PythonOptions.PYTHON_FILES.key(), python_files)
+
+    def set_python_requirements(self, requirements_file_path: str,
+                                requirements_cache_dir: str = None):
+        """
+        Specifies a requirements.txt file which defines the third-party dependencies.
+        These dependencies will be installed to a temporary directory and added to the
+        PYTHONPATH of the python UDF worker.
+
+        For the dependencies which could not be accessed in the cluster, a directory which contains
+        the installation packages of these dependencies could be specified using the parameter
+        "requirements_cached_dir". It will be uploaded to the cluster to support offline
+        installation.
+
+        Example:
+        ::
+
+            # commands executed in shell
+            $ echo numpy==1.16.5 > requirements.txt
+            $ pip download -d cached_dir -r requirements.txt --no-binary :all:
+
+            # python code
+            >>> stream_env.set_python_requirements("requirements.txt", "cached_dir")
+
+        .. note::
+
+            Please make sure the installation packages matches the platform of the cluster
+            and the python version used. These packages will be installed using pip,
+            so also make sure the version of Pip (version >= 7.1.0) and the version of
+            SetupTools (version >= 37.0.0).
+
+        :param requirements_file_path: The path of "requirements.txt" file.
+        :type requirements_file_path: str
+        :param requirements_cache_dir: The path of the local directory which contains the
+                                       installation packages.
+        :type requirements_cache_dir: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -64,4 +82,103 @@ public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env)
 		Configuration envConfiguration = (Configuration) getConfigurationMethod.invoke(env);
 		return envConfiguration;
 	}
+
+	/**
+	 * Configure the {@link DataStreamPythonStatelessFunctionOperator} to be chained with the upstream/downstream
+	 * operator by setting their parallelism, slot sharing group, co-location group to be the same, and applying a
+	 * {@link ForwardPartitioner}.
+	 * 1. operator with name "_keyed_stream_values_operator" should align with its downstream operator.
+	 * 2. operator with name "_stream_key_by_map_operator" should align with its upstream operator.
+	 */
+	private static void alignStreamNode(StreamNode streamNode, StreamGraph streamGraph) {
+		if (streamNode.getOperatorName().equals(KEYED_STREAM_VALUE_OPERATOR_NAME)) {
+			StreamEdge downStreamEdge = streamNode.getOutEdges().get(0);
+			StreamNode downStreamNode = streamGraph.getStreamNode(downStreamEdge.getTargetId());
+			downStreamEdge.setPartitioner(new ForwardPartitioner());
+			streamNode.setParallelism(downStreamNode.getParallelism());
+			streamNode.setCoLocationGroup(downStreamNode.getCoLocationGroup());
+			streamNode.setSlotSharingGroup(downStreamNode.getSlotSharingGroup());
+		}
+
+		if (streamNode.getOperatorName().equals(STREAM_KEY_BY_MAP_OPERATOR_NAME)) {
+			StreamEdge upStreamEdge = streamNode.getInEdges().get(0);
+			StreamNode upStreamNode = streamGraph.getStreamNode(upStreamEdge.getSourceId());
+			upStreamEdge.setPartitioner(new ForwardPartitioner<>());
+			streamNode.setParallelism(upStreamNode.getParallelism());
+			streamNode.setSlotSharingGroup(upStreamNode.getSlotSharingGroup());
+			streamNode.setCoLocationGroup(upStreamNode.getCoLocationGroup());
+		}
+	}
+
+	/**
+	 * Generate a {@link StreamGraph} for transformations maintained by current {@link StreamExecutionEnvironment}, and
+	 * reset the merged env configurations with dependencies to every {@link DataStreamPythonStatelessFunctionOperator}.
+	 * It is an idempotent operation that can be call multiple times. Remember that only when need to execute the
+	 * StreamGraph can we set the clearTransformations to be True.
+	 */
+	public static StreamGraph generateStreamGraphWithDependencies(
+		StreamExecutionEnvironment env, boolean clearTransformations) throws IllegalAccessException,
+		NoSuchMethodException, InvocationTargetException {
+
+		Configuration mergedConfig = getEnvConfigWithDependencies(env);
+		StreamGraph streamGraph = env.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, clearTransformations);
+		Collection<StreamNode> streamNodes = streamGraph.getStreamNodes();
+		for (StreamNode streamNode : streamNodes) {
+
+			alignStreamNode(streamNode, streamGraph);
+
+			StreamOperatorFactory streamOperatorFactory = streamNode.getOperatorFactory();
+			if (streamOperatorFactory instanceof SimpleOperatorFactory) {
+				StreamOperator streamOperator = ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
+				if (streamOperator instanceof DataStreamPythonStatelessFunctionOperator) {
+					DataStreamPythonStatelessFunctionOperator dataStreamPythonStatelessFunctionOperator =
+						(DataStreamPythonStatelessFunctionOperator) streamOperator;
+					Configuration oldConfig = dataStreamPythonStatelessFunctionOperator.getMergedEnvConfig();
+					dataStreamPythonStatelessFunctionOperator.setPythonConfig(generateNewPythonConfig(oldConfig,
+						mergedConfig));
+				}
+			}
+		}
+		return streamGraph;
+	}
+
+	/**
+	 * Generator a new {@link  PythonConfig} with the combined config which is derived from oldConfig.
+	 */
+	private static PythonConfig generateNewPythonConfig(Configuration oldConfig, Configuration newConfig) {
+		setIfNotExist(PythonOptions.MAX_BUNDLE_SIZE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.MAX_BUNDLE_TIME_MILLS, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.MAX_BUNDLE_TIME_MILLS, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_EXECUTABLE, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.PYTHON_METRIC_ENABLED, oldConfig, newConfig);
+		setIfNotExist(PythonOptions.USE_MANAGED_MEMORY, oldConfig, newConfig);
+
+		combineConfigValue(PythonDependencyUtils.PYTHON_FILES, oldConfig, newConfig);
+		combineConfigValue(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE, oldConfig, newConfig);
+		combineConfigValue(PythonDependencyUtils.PYTHON_ARCHIVES, oldConfig, newConfig);
+
+		return new PythonConfig(oldConfig);
+	}
+
+	/**
+	 * Make sure new configuration not overriding the previously configured value. For example, the MAX_BUNDLE_SIZE of
+	 * {@link org.apache.flink.datastream.runtime.operators.python.DataStreamPythonReduceFunctionOperator} is
+	 * pre-configured to be 1, we must not to change it.
+	 */
+	private static void setIfNotExist(ConfigOption configOption, Configuration oldConfig, Configuration newConfig) {
+		if (!oldConfig.containsKey(configOption.key())) {
+			oldConfig.set(configOption, newConfig.get(configOption));
+		}
+	}
+
+	/**
+	 * Dependency file information maintained by a Map in old config can be combined with new config.
+	 */
+	private static void combineConfigValue(ConfigOption<Map<String, String>> configOption, Configuration oldConfig, Configuration newConfig) {
+		Map<String, String> oldConfigValue = oldConfig.getOptional(configOption).orElse(new HashMap<>());
+		oldConfigValue.putAll(newConfig.getOptional(configOption).orElse(new HashMap<>()));
+		oldConfig.set(configOption, oldConfigValue);
+	}

Review comment:
       These methods can be replaced by:
   ```
   	private static PythonConfig generateNewPythonConfig(Configuration oldConfig, Configuration newConfig) {
   		newConfig.clone().addAll(oldConfig);
   		return new PythonConfig(oldConfig);
   	}
   ```

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -416,10 +584,14 @@ def execute(self, job_name=None):
         :param job_name: Desired name of the job, optional.
         :return: The result of the job execution, containing elapsed time and accumulators.
         """
-        if job_name is None:
-            return JobExecutionResult(self._j_stream_execution_environment.execute())
-        else:
-            return JobExecutionResult(self._j_stream_execution_environment.execute(job_name))
+        j_stream_graph = get_gateway().jvm \
+            .org.apache.flink.python.util.PythonConfigUtil.generateStreamGraphWithDependencies(
+            self._j_stream_execution_environment, True)

Review comment:
       We can extract a method and reuse it in different methods.

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -389,6 +389,174 @@ def get_stream_time_characteristic(self):
         j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
         return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
 
+    def add_python_file(self, file_path: str):
+        """
+        Adds a python dependency which could be python files, python packages or
+        local directories. They will be added to the PYTHONPATH of the python UDF worker.
+        Please make sure that these dependencies can be imported.
+
+        :param file_path: The path of the python dependency.
+        :type file_path: str
+        """
+        jvm = get_gateway().jvm
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil\
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_files = env_config.getString(jvm.PythonOptions.PYTHON_FILES.key(), None)
+        if python_files is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join([python_files, file_path])
+        else:
+            python_files = file_path
+        env_config.setString(jvm.PythonOptions.PYTHON_FILES.key(), python_files)
+
+    def set_python_requirements(self, requirements_file_path: str,
+                                requirements_cache_dir: str = None):
+        """
+        Specifies a requirements.txt file which defines the third-party dependencies.
+        These dependencies will be installed to a temporary directory and added to the
+        PYTHONPATH of the python UDF worker.
+
+        For the dependencies which could not be accessed in the cluster, a directory which contains
+        the installation packages of these dependencies could be specified using the parameter
+        "requirements_cached_dir". It will be uploaded to the cluster to support offline
+        installation.
+
+        Example:
+        ::
+
+            # commands executed in shell
+            $ echo numpy==1.16.5 > requirements.txt
+            $ pip download -d cached_dir -r requirements.txt --no-binary :all:
+
+            # python code
+            >>> stream_env.set_python_requirements("requirements.txt", "cached_dir")
+
+        .. note::
+
+            Please make sure the installation packages matches the platform of the cluster
+            and the python version used. These packages will be installed using pip,
+            so also make sure the version of Pip (version >= 7.1.0) and the version of
+            SetupTools (version >= 37.0.0).
+
+        :param requirements_file_path: The path of "requirements.txt" file.
+        :type requirements_file_path: str
+        :param requirements_cache_dir: The path of the local directory which contains the
+                                       installation packages.
+        :type requirements_cache_dir: str
+        """
+        jvm = get_gateway().jvm
+        python_requirements = requirements_file_path
+        if requirements_cache_dir is not None:
+            python_requirements = jvm.PythonDependencyUtils.PARAM_DELIMITER.join(
+                [python_requirements, requirements_cache_dir])
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        env_config.setString(jvm.PythonOptions.PYTHON_REQUIREMENTS.key(), python_requirements)
+
+    def add_python_archive(self, archive_path: str, target_dir: str = None):
+        """
+        Adds a python archive file. The file will be extracted to the working directory of
+        python UDF worker.
+
+        If the parameter "target_dir" is specified, the archive file will be extracted to a
+        directory named ${target_dir}. Otherwise, the archive file will be extracted to a
+        directory with the same name of the archive file.
+
+        If python UDF depends on a specific python version which does not exist in the cluster,
+        this method can be used to upload the virtual environment.
+        Note that the path of the python interpreter contained in the uploaded environment
+        should be specified via the method :func:`pyflink.table.TableConfig.set_python_executable`.
+
+        The files uploaded via this method are also accessible in UDFs via relative path.
+
+        Example:
+        ::
+
+            # command executed in shell
+            # assert the relative path of python interpreter is py_env/bin/python
+            $ zip -r py_env.zip py_env
+
+            # python code
+            >>> stream_env.add_python_archive("py_env.zip")
+            >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python")
+
+            # or
+            >>> stream_env.add_python_archive("py_env.zip", "myenv")
+            >>> stream_env.set_python_executable("myenv/py_env/bin/python")
+
+            # the files contained in the archive file can be accessed in UDF
+            >>> def my_udf():
+            ...     with open("myenv/py_env/data/data.txt") as f:
+            ...         ...
+
+        .. note::
+
+            Please make sure the uploaded python environment matches the platform that the cluster
+            is running on and that the python version must be 3.5 or higher.
+
+        .. note::
+
+            Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc.
+            The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
+
+        :param archive_path: The archive file path.
+        :type archive_path: str
+        :param target_dir: Optional, the target dir name that the archive file extracted to.
+        :type target_dir: str
+        """
+        jvm = get_gateway().jvm
+        if target_dir is not None:
+            archive_path = jvm.PythonDependencyUtils.PARAM_DELIMITER.join(
+                [archive_path, target_dir])
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        python_archives = env_config.getString(jvm.PythonOptions.PYTHON_ARCHIVES.key(), None)
+        if python_archives is not None:
+            python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join(
+                [python_archives, archive_path])
+        else:
+            python_files = archive_path
+        env_config.setString(jvm.PythonOptions.PYTHON_ARCHIVES.key(), python_files)
+
+    def set_python_executable(self, python_exec: str):
+        """
+        Sets the path of the python interpreter which is used to execute the python udf workers.
+
+        e.g. "/usr/local/bin/python3".
+
+        If python UDF depends on a specific python version which does not exist in the cluster,
+        the method :func:`pyflink.datastream.StreamExecutionEnvironment.add_python_archive` can be
+        used to upload a virtual environment. The path of the python interpreter contained in the
+        uploaded environment can be specified via this method.
+
+        Example:
+        ::
+
+            # command executed in shell
+            # assume that the relative path of python interpreter is py_env/bin/python
+            $ zip -r py_env.zip py_env
+
+            # python code
+            >>> stream_env.add_python_archive("py_env.zip")
+            >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python")
+
+        .. note::
+
+            Please make sure the uploaded python environment matches the platform that the cluster
+            is running on and that the python version must be 3.5 or higher.
+
+        .. note::
+
+            The python udf worker depends on Apache Beam (version == 2.19.0).
+            Please ensure that the specified environment meets the above requirements.
+
+        :param python_exec: The path of python interpreter.
+        :type python_exec: str

Review comment:
       Duplicate type hint.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -438,7 +440,7 @@ def _get_java_python_function_operator(self, func: Union[Function, FunctionWrapp
 
         j_env = self._j_data_stream.getExecutionEnvironment()
         PythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
-        j_conf = PythonConfigUtil.getMergedConfig(j_env)
+        j_conf = PythonConfigUtil.getEnvConfigWithDependencies(j_env)

Review comment:
       We need to return an empty config here. 

##########
File path: flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
##########
@@ -280,5 +283,222 @@ def test_execute_async(self):
         execution_result = job_client.get_job_execution_result().result()
         self.assertEqual(str(job_id), str(execution_result.get_job_id()))
 
+    def test_add_python_file(self):
+        import uuid
+        python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
+        os.mkdir(python_file_dir)
+        python_file_path = os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
+        with open(python_file_path, 'w') as f:
+            f.write("def add_two(a):\n    return a + 2")
+
+        def plus_two_map(value):
+            from test_stream_dependency_manage_lib import add_two
+            return add_two(value)
+
+        self.env.add_python_file(python_file_path)
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(plus_two_map).add_sink(self.test_sink)
+        self.env.execute("test add python file")
+        result = self.test_sink.get_results(True)
+        expected = ['3', '4', '5', '6', '7']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_set_requirements_without_cached_directory(self):
+        import uuid
+        requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
+        with open(requirements_txt_path, 'w') as f:
+            f.write("cloudpickle==1.2.2")
+        self.env.set_python_requirements(requirements_txt_path)
+
+        def check_requirements(i):
+            import cloudpickle
+            assert os.path.abspath(cloudpickle.__file__).startswith(
+                os.environ['_PYTHON_REQUIREMENTS_INSTALL_DIR'])
+            return i
+
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(check_requirements).add_sink(self.test_sink)
+        self.env.execute("test set requirements without cache dir")
+        result = self.test_sink.get_results(True)
+        expected = ['1', '2', '3', '4', '5']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_set_requirements_with_cached_directory(self):
+        import uuid
+        tmp_dir = self.tempdir
+        requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4()))
+        with open(requirements_txt_path, 'w') as f:
+            f.write("python-package1==0.0.0")
+
+        requirements_dir_path = os.path.join(tmp_dir, "requirements_dir_" + str(uuid.uuid4()))
+        os.mkdir(requirements_dir_path)
+        package_file_name = "python-package1-0.0.0.tar.gz"
+        with open(os.path.join(requirements_dir_path, package_file_name), 'wb') as f:
+            import base64
+            # This base64 data is encoded from a python package file which includes a
+            # "python_package1" module. The module contains a "plus(a, b)" function.
+            # The base64 can be recomputed by following code:
+            # base64.b64encode(open("python-package1-0.0.0.tar.gz", "rb").read()).decode("utf-8")
+            f.write(base64.b64decode(
+                "H4sICNefrV0C/2Rpc3QvcHl0aG9uLXBhY2thZ2UxLTAuMC4wLnRhcgDtmVtv2jAYhnPtX2H1CrRCY+ckI"
+                "XEx7axuUA11u5imyICTRc1JiVnHfv1MKKWjYxwKEdPehws7xkmUfH5f+3PyqfqWpa1cjG5EKFnLbOvfhX"
+                "FQTI3nOPPSdavS5Pa8nGMwy3Esi3ke9wyTObbnGNQxamBSKlFQavzUryG8ldG6frpbEGx4yNmDLMp/hPy"
+                "P8b+6fNN613vdP1z8XdteG3+ug/17/F3Hcw1qIv5H54NUYiyUaH2SRRllaYeytkl6IpEdujI2yH2XapCQ"
+                "wSRJRDHt0OveZa//uUfeZonUvUO5bHo+0ZcoVo9bMhFRvGx9H41kWj447aUsR0WUq+pui8arWKggK5Jli"
+                "wGOo/95q79ovXi6/nfyf246Dof/n078fT9KI+X77Xx6BP83bX4Xf5NxT7dz7toO/L8OxjKgeTwpG+KcDp"
+                "sdQjWFVJMipYI+o0MCk4X/t2UYtqI0yPabCHb3f861XcD/Ty/+Y5nLdCzT0dSPo/SmbKsf6un+b7KV+Ls"
+                "W4/D/OoC9w/930P9eGwM75//csrD+Q/6P/P/k9D/oX3988Wqw1bS/tf6tR+s/m3EG/ddBqXO9XKf15C8p"
+                "P9k4HZBtBgzZaVW5vrfKcj+W32W82ygEB9D/Xu9+4/qfP9L/rBv0X1v87yONKRX61/qfzwqjIDzIPTbv/"
+                "7or3/88i0H/tfBFW7s/s/avRInQH06ieEy7tDrQeYHUdRN7wP+n/vf62LOH/pld7f9xz7a5Pfufedy0oP"
+                "86iJI8KxStAq6yLC4JWdbbVbWRikR2z1ZGytk5vauW3QdnBFE6XqwmykazCesAAAAAAAAAAAAAAAAAAAA"
+                "AAAAAAAAAAAAAAOBw/AJw5CHBAFAAAA=="))
+        self.env.set_python_requirements(requirements_txt_path, requirements_dir_path)
+
+        def add_one(i):
+            from python_package1 import plus
+            return plus(i, 1)
+
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(add_one).add_sink(self.test_sink)
+        self.env.execute("test set requirements with cachd dir")
+        result = self.test_sink.get_results(True)
+        expected = ['2', '3', '4', '5', '6']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_add_python_archive(self):
+        import uuid
+        import shutil
+        tmp_dir = self.tempdir
+        archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
+        os.mkdir(archive_dir_path)
+        with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
+            f.write("2")
+        archive_file_path = \
+            shutil.make_archive(os.path.dirname(archive_dir_path), 'zip', archive_dir_path)
+        self.env.add_python_archive(archive_file_path, "data")
+
+        def add_from_file(i):
+            with open("data/data.txt", 'r') as f:
+                return i + int(f.read())
+
+        ds = self.env.from_collection([1, 2, 3, 4, 5])
+        ds.map(add_from_file).add_sink(self.test_sink)
+        self.env.execute("test set python archive")
+        result = self.test_sink.get_results(True)
+        expected = ['3', '4', '5', '6', '7']
+        result.sort()
+        expected.sort()
+        self.assertEqual(expected, result)
+
+    def test_set_stream_env(self):
+        import sys
+        python_exec = sys.executable
+        tmp_dir = self.tempdir
+        python_exec_link_path = os.path.join(tmp_dir, "py_exec")
+        os.symlink(python_exec, python_exec_link_path)
+        self.env.set_python_executable(python_exec_link_path)
+
+        def check_python_exec(i):
+            import os
+            assert os.environ["python"] == python_exec_link_path
+            return i
+
+        def check_pyflink_gateway_disabled(i):
+            try:
+                from pyflink.java_gateway import get_gateway
+                get_gateway()
+            except Exception as e:
+                assert str(e).startswith("It's launching the PythonGatewayServer during Python UDF"
+                                         " execution which is unexpected.")
+            else:
+                raise Exception("The gateway server is not disabled!")
+            return i

Review comment:
       Duplicated check since it has been tested by other tests. 

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -240,6 +240,14 @@ public void processWatermark(Watermark mark) throws Exception {
 		}
 	}
 
+	/**
+	 * Reset the {@link PythonConfig} if needed.
+	 * */
+	@Internal
+	public void setPythonConfig(PythonConfig pythonConfig) {

Review comment:
       Put the set method close to the corresponding get method. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673313436


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6098b6b0563eded5c97787f55094467b82a88593",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485",
       "triggerID" : "6098b6b0563eded5c97787f55094467b82a88593",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5499",
       "triggerID" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6098b6b0563eded5c97787f55094467b82a88593 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485) 
   * ccb177399810fc93aaa9b393e092245deaf6789e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 merged pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
hequn8128 merged pull request #13136:
URL: https://github.com/apache/flink/pull/13136


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673313436


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6098b6b0563eded5c97787f55094467b82a88593",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485",
       "triggerID" : "6098b6b0563eded5c97787f55094467b82a88593",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6098b6b0563eded5c97787f55094467b82a88593 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485) 
   * ccb177399810fc93aaa9b393e092245deaf6789e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673313436


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6098b6b0563eded5c97787f55094467b82a88593",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6098b6b0563eded5c97787f55094467b82a88593",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6098b6b0563eded5c97787f55094467b82a88593 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673313436


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6098b6b0563eded5c97787f55094467b82a88593",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485",
       "triggerID" : "6098b6b0563eded5c97787f55094467b82a88593",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6098b6b0563eded5c97787f55094467b82a88593 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673313436


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6098b6b0563eded5c97787f55094467b82a88593",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485",
       "triggerID" : "6098b6b0563eded5c97787f55094467b82a88593",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5499",
       "triggerID" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ccb177399810fc93aaa9b393e092245deaf6789e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673388443


   @hequn8128 Thank you for your comments, I have updated the PR according to your suggestions, please have a look at it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673313436


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6098b6b0563eded5c97787f55094467b82a88593",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485",
       "triggerID" : "6098b6b0563eded5c97787f55094467b82a88593",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5499",
       "triggerID" : "ccb177399810fc93aaa9b393e092245deaf6789e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6098b6b0563eded5c97787f55094467b82a88593 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5485) 
   * ccb177399810fc93aaa9b393e092245deaf6789e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13136: [FLINK-18878][python] Support dependency management for Python Stream…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13136:
URL: https://github.com/apache/flink/pull/13136#issuecomment-673306635


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 6098b6b0563eded5c97787f55094467b82a88593 (Thu Aug 13 07:16:22 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-18878).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org