You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/10 04:43:00 UTC

[GitHub] [flink] shuiqiangchen opened a new pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

shuiqiangchen opened a new pull request #13098:
URL: https://github.com/apache/flink/pull/13098


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Support filter() operation for Python DataStream API.
   
   ## Brief change log
   
   - Add filter() interface for Python DataStream API
   - Add a new Function class named FilterFunction and a wrapper class for user defined filter function.
   
   ## Verifying this change
   
   This change has test case covered by test_filter_with_data_types and test_filter_without_data_types in test_data_stream.py
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): ( no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2577a9c0cca713f583f38d593c8803bc6d5506c8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357) 
   * af3d92942065464e267e9715696ec3154ed32ee9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393",
       "triggerID" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eccf085ce92a91039325b98bb5de50ccd258f5e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9eccf085ce92a91039325b98bb5de50ccd258f5e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 19ff831a80a0ab5a1fbaabfb6d19e91f25d32314 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382) 
   * 42988fb8a3ef91b8b25104772c702c154a822ac2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393) 
   * 9eccf085ce92a91039325b98bb5de50ccd258f5e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8987176f205b330cf101f81aa126637c0846b298 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332) 
   * 8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8987176f205b330cf101f81aa126637c0846b298 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332) 
   * 8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336) 
   * f3d582238a0afb694a4712c247a66cd31fac072b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on a change in pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #13098:
URL: https://github.com/apache/flink/pull/13098#discussion_r467701714



##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -149,6 +149,33 @@ def flat_map(value):
         expected.sort()
         self.assertEqual(expected, results)
 
+    def test_filter_without_data_types(self):
+        ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')])
+        filtered_stream = ds.filter(MyFilterFunction())
+        collect_util = DataStreamCollectUtil()
+        collect_util.collect(filtered_stream)
+        self.env.execute("test filter")
+        results = collect_util.results()
+        expected = ["(2, 'Hello', 'Hi')"]
+        results.sort()
+        expected.sort()
+        self.assertEqual(expected, results)
+
+    def test_filter_with_data_types(self):
+        ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')],
+                                      type_info=Types.ROW(
+                                          [Types.INT(), Types.STRING(), Types.STRING()])
+                                      )
+        filtered_stream = ds.filter(MyFilterFunction())
+        collect_util = DataStreamCollectUtil()
+        collect_util.collect(filtered_stream)
+        self.env.execute("test filter")
+        results = collect_util.results()
+        expected = ['2,Hello,Hi']
+        results.sort()
+        expected.sort()
+        self.assertEqual(expected, results)

Review comment:
       Change either one to Callable filter function to cover the Callable scenario. 

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())
+    Note that the system assumes that the function does not modify the elemetns on which the

Review comment:
       elemetns => elements

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())
+    Note that the system assumes that the function does not modify the elemetns on which the
+    predicate is applied. Violating this assumption can lead to incoorect results.
+    """
+
+    @abc.abstractmethod
+    def filter(self, value):
+        """
+        The filter function that evaluates the predicate.
+
+        :param value: The value to be filtered.
+        :return: Tre for values that should be retained, false for values to be filtered out.

Review comment:
       Tre => True

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;

Review comment:
       Remove `;`

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())
+    Note that the system assumes that the function does not modify the elemetns on which the
+    predicate is applied. Violating this assumption can lead to incoorect results.

Review comment:
       incoorect => incorrect

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -233,6 +233,32 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInform
             j_python_data_stream_scalar_function_operator
         ))
 
+    def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
+        """
+        Applies a Filter transformation on a DataStream. The transformation calls a FilterFunction
+        for each element of the DataStream and retains only those element for which the function
+        returns true. Elements for which the function returns false are filtered. The user can also
+        extend RichFilterFunction to gain access to other features provided by the RichFunction
+        interface.
+
+        :param func: The FilterFunction that is called for each element of the DataStream.
+        :return: The filtered DataStream.
+        """
+        class FilterFlatMap(FlatMapFunction):
+            def __init__(self, filter_func):
+                self._func = filter_func
+
+            def flat_map(self, value):
+                if self._func.filter(value):
+                    yield value
+
+        j_input_type = self._j_data_stream.getTransformation().getOutputType()
+        type_info = typeinfo._from_java_type(j_input_type)
+        j_data_stream = self.flat_map(FilterFlatMap(func), type_info=type_info)._j_data_stream

Review comment:
       What if the `func` is a `Callable`?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())

Review comment:
       Remove `new`

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -140,6 +163,19 @@ def flat_map(self, value):
         return self._func(value)
 
 
+class FilterFunctionWrapper(FunctionWrapper):

Review comment:
       This class has never been used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3d582238a0afb694a4712c247a66cd31fac072b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343) 
   * 2577a9c0cca713f583f38d593c8803bc6d5506c8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336) 
   * f3d582238a0afb694a4712c247a66cd31fac072b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343) 
   * 2577a9c0cca713f583f38d593c8803bc6d5506c8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671191233


   Hi @hequn8128, thank you for your comments, I will pay more attention to typos and shall never write codes in a hurry.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8987176f205b330cf101f81aa126637c0846b298 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2577a9c0cca713f583f38d593c8803bc6d5506c8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3d92942065464e267e9715696ec3154ed32ee9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on a change in pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #13098:
URL: https://github.com/apache/flink/pull/13098#discussion_r468370363



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -434,6 +465,16 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInform
             -> 'DataStream':
         return self._values().flat_map(func, type_info)
 
+    def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
+        return self._values().filter(func)
+
+    def add_sink(self, sink_func: SinkFunction) -> 'DataStreamSink':
+        return self._values().add_sink(sink_func)
+
+    def key_by(self, key_selector: Union[Callable, KeySelector],
+               key_type_info: TypeInformation = None) -> 'KeyedStream':
+        return self._values().key_by(key_selector, key_type_info)

Review comment:
       There are side effects for this implementation. If we perform multi key_by, it will also introduce multi maps in the stream graph. This should be avoided. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393",
       "triggerID" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3d92942065464e267e9715696ec3154ed32ee9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378) 
   * 19ff831a80a0ab5a1fbaabfb6d19e91f25d32314 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382) 
   * 42988fb8a3ef91b8b25104772c702c154a822ac2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3d92942065464e267e9715696ec3154ed32ee9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378) 
   * 19ff831a80a0ab5a1fbaabfb6d19e91f25d32314 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393",
       "triggerID" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 19ff831a80a0ab5a1fbaabfb6d19e91f25d32314 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382) 
   * 42988fb8a3ef91b8b25104772c702c154a822ac2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2577a9c0cca713f583f38d593c8803bc6d5506c8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357) 
   * af3d92942065464e267e9715696ec3154ed32ee9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8987176f205b330cf101f81aa126637c0846b298 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3d92942065464e267e9715696ec3154ed32ee9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378) 
   * 19ff831a80a0ab5a1fbaabfb6d19e91f25d32314 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382) 
   * 42988fb8a3ef91b8b25104772c702c154a822ac2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336) 
   * f3d582238a0afb694a4712c247a66cd31fac072b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343) 
   * 2577a9c0cca713f583f38d593c8803bc6d5506c8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393",
       "triggerID" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eccf085ce92a91039325b98bb5de50ccd258f5e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5399",
       "triggerID" : "9eccf085ce92a91039325b98bb5de50ccd258f5e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 42988fb8a3ef91b8b25104772c702c154a822ac2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393) 
   * 9eccf085ce92a91039325b98bb5de50ccd258f5e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5399) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8987176f205b330cf101f81aa126637c0846b298 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332) 
   * 8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336) 
   * f3d582238a0afb694a4712c247a66cd31fac072b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8987176f205b330cf101f81aa126637c0846b298 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332) 
   * 8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336) 
   * f3d582238a0afb694a4712c247a66cd31fac072b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13098:
URL: https://github.com/apache/flink/pull/13098#discussion_r467732645



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -233,6 +234,35 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInform
             j_python_data_stream_scalar_function_operator
         ))
 
+    def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
+        """
+        Applies a Filter transformation on a DataStream. The transformation calls a FilterFunction
+        for each element of the DataStream and retains only those element for which the function
+        returns true. Elements for which the function returns false are filtered. The user can also
+        extend RichFilterFunction to gain access to other features provided by the RichFunction
+        interface.
+
+        :param func: The FilterFunction that is called for each element of the DataStream.
+        :return: The filtered DataStream.
+        """
+        class FilterFlatMap(FlatMapFunction):
+            def __init__(self, filter_func):
+                self._func = filter_func
+
+            def flat_map(self, value):
+                if self._func.filter(value):
+                    yield value
+
+        if isinstance(func, Callable):

Review comment:
       Ok, this will make it more robust.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3d92942065464e267e9715696ec3154ed32ee9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378) 
   * 19ff831a80a0ab5a1fbaabfb6d19e91f25d32314 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on a change in pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #13098:
URL: https://github.com/apache/flink/pull/13098#discussion_r467731300



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -233,6 +234,35 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInform
             j_python_data_stream_scalar_function_operator
         ))
 
+    def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
+        """
+        Applies a Filter transformation on a DataStream. The transformation calls a FilterFunction
+        for each element of the DataStream and retains only those element for which the function
+        returns true. Elements for which the function returns false are filtered. The user can also
+        extend RichFilterFunction to gain access to other features provided by the RichFunction
+        interface.
+
+        :param func: The FilterFunction that is called for each element of the DataStream.
+        :return: The filtered DataStream.
+        """
+        class FilterFlatMap(FlatMapFunction):
+            def __init__(self, filter_func):
+                self._func = filter_func
+
+            def flat_map(self, value):
+                if self._func.filter(value):
+                    yield value
+
+        if isinstance(func, Callable):

Review comment:
       throw an exception if it is not a FilterFunction nor a Callable function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671163278


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8987176f205b330cf101f81aa126637c0846b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5332",
       "triggerID" : "8987176f205b330cf101f81aa126637c0846b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5336",
       "triggerID" : "8c98a59b403eb2e1d16f3915f3b5fca1ba5c88ff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5343",
       "triggerID" : "f3d582238a0afb694a4712c247a66cd31fac072b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5357",
       "triggerID" : "2577a9c0cca713f583f38d593c8803bc6d5506c8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5378",
       "triggerID" : "af3d92942065464e267e9715696ec3154ed32ee9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382",
       "triggerID" : "19ff831a80a0ab5a1fbaabfb6d19e91f25d32314",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393",
       "triggerID" : "42988fb8a3ef91b8b25104772c702c154a822ac2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eccf085ce92a91039325b98bb5de50ccd258f5e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5399",
       "triggerID" : "9eccf085ce92a91039325b98bb5de50ccd258f5e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 19ff831a80a0ab5a1fbaabfb6d19e91f25d32314 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5382) 
   * 42988fb8a3ef91b8b25104772c702c154a822ac2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5393) 
   * 9eccf085ce92a91039325b98bb5de50ccd258f5e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5399) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 merged pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
hequn8128 merged pull request #13098:
URL: https://github.com/apache/flink/pull/13098


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13098:
URL: https://github.com/apache/flink/pull/13098#issuecomment-671160441


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 8987176f205b330cf101f81aa126637c0846b298 (Mon Aug 10 04:50:20 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-18866).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org