You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/14 02:03:02 UTC

[GitHub] [flink] dianfu commented on a change in pull request #19054: [FLINK-26482][python] Support WindowedStream.reduce in Python DataStream API

dianfu commented on a change in pull request #19054:
URL: https://github.com/apache/flink/pull/19054#discussion_r825543942



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1332,6 +1337,55 @@ def allowed_lateness(self, time_ms: int):
         self._allowed_lateness = time_ms
         return self
 
+    def reduce(self,
+               reduce_function: Union[Callable, ReduceFunction],
+               window_function: Union[WindowFunction, ProcessWindowFunction] = None,
+               result_type: TypeInformation = None) -> DataStream:

Review comment:
       ```suggestion
                  output_type: TypeInformation = None) -> DataStream:
   ```
   
   Change to output_type to keep it consistent with the other methods?

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1332,6 +1337,55 @@ def allowed_lateness(self, time_ms: int):
         self._allowed_lateness = time_ms
         return self
 
+    def reduce(self,
+               reduce_function: Union[Callable, ReduceFunction],
+               window_function: Union[WindowFunction, ProcessWindowFunction] = None,
+               result_type: TypeInformation = None) -> DataStream:
+        """
+        Applies a reduce function to the window. The window function is called for each evaluation of

Review comment:
       checksytle issue: line too long (101 > 100 characters)
   
   You can run the following command under flink-python directory for all checkstyle issues:
   ./dev/lint-python.sh -e flake8,sphinx

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -1004,15 +1010,37 @@ def process(self,
                 key: KEY,
                 window: W,
                 context: InternalWindowContext,
-                input_data: Iterable[IN]) -> Iterable[OUT]:

Review comment:
       What's the purpose of this change?

##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -794,6 +794,62 @@ def test_time_window(self):
         expected = ['(hi,1)', '(hi,1)', '(hi,2)', '(hi,3)']
         self.assert_equals_sorted(expected, results)
 
+    def test_time_window_reduce_passthrough(self):
+        self.env.set_parallelism(1)

Review comment:
       It's not necessary to set the parallelism to 1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org