You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/19 06:15:23 UTC

[GitHub] [flink] dianfu commented on a diff in pull request #19758: [FLINK-26480]Support window_all in Python DataStream API

dianfu commented on code in PR #19758:
URL: https://github.com/apache/flink/pull/19758#discussion_r876618388


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1548,6 +1552,20 @@ def count_window(self, size: int, slide: int = 0):
         else:
             return WindowedStream(self, CountSlidingWindowAssigner(size, slide))
 
+    def window_all(self, window_assigner: WindowAssigner) -> 'AllWindowedStream':

Review Comment:
   Could we also add `count_window_all` to align with Java API?



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1548,6 +1552,20 @@ def count_window(self, size: int, slide: int = 0):
         else:
             return WindowedStream(self, CountSlidingWindowAssigner(size, slide))
 
+    def window_all(self, window_assigner: WindowAssigner) -> 'AllWindowedStream':

Review Comment:
   This method should appear in DataStream instead of KeyedStream.



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
             j_python_data_stream_function_operator))
 
 
+class AllWindowedStream(object):
+    """
+    A AllWindowedStream represents a data stream where the stream of elements is split into windows
+    based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+    If an Evictor is specified it will be used to evict elements from the window after evaluation
+    was triggered by the Trigger but before the actual evaluation of the window.
+    When using an evictor, window performance will degrade significantly, since pre-aggregation of
+    window results cannot be used.
+
+    Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream
+    will be collapsed together with the operation over the window into one single operation.
+    """
+
+    def __init__(self, data_stream: DataStream, window_assigner: WindowAssigner):
+        self._input_stream = data_stream.key_by(NullByteKeySelector())
+        self._window_assigner = window_assigner
+        self._allowed_lateness = 0
+        self._late_data_output_tag = None  # type: Optional[OutputTag]
+        self._window_trigger = window_assigner.get_default_trigger(
+            self._input_stream.get_execution_environment())  # type: Trigger
+
+    def get_input_type(self):
+        return _from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+    def trigger(self, trigger: Trigger):

Review Comment:
   ```suggestion
       def trigger(self, trigger: Trigger) -> 'AllWindowedStream':
   ```
   Need also update the other methods.



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
             j_python_data_stream_function_operator))
 
 
+class AllWindowedStream(object):
+    """
+    A AllWindowedStream represents a data stream where the stream of elements is split into windows
+    based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+    If an Evictor is specified it will be used to evict elements from the window after evaluation
+    was triggered by the Trigger but before the actual evaluation of the window.
+    When using an evictor, window performance will degrade significantly, since pre-aggregation of
+    window results cannot be used.
+
+    Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream
+    will be collapsed together with the operation over the window into one single operation.
+    """
+
+    def __init__(self, data_stream: DataStream, window_assigner: WindowAssigner):
+        self._input_stream = data_stream.key_by(NullByteKeySelector())
+        self._window_assigner = window_assigner
+        self._allowed_lateness = 0
+        self._late_data_output_tag = None  # type: Optional[OutputTag]

Review Comment:
   this field is never used. I guess you missed method `sideOutputLateData`



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
             j_python_data_stream_function_operator))
 
 
+class AllWindowedStream(object):
+    """
+    A AllWindowedStream represents a data stream where the stream of elements is split into windows
+    based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+    If an Evictor is specified it will be used to evict elements from the window after evaluation
+    was triggered by the Trigger but before the actual evaluation of the window.
+    When using an evictor, window performance will degrade significantly, since pre-aggregation of
+    window results cannot be used.
+
+    Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream
+    will be collapsed together with the operation over the window into one single operation.
+    """
+
+    def __init__(self, data_stream: DataStream, window_assigner: WindowAssigner):
+        self._input_stream = data_stream.key_by(NullByteKeySelector())
+        self._window_assigner = window_assigner
+        self._allowed_lateness = 0
+        self._late_data_output_tag = None  # type: Optional[OutputTag]
+        self._window_trigger = window_assigner.get_default_trigger(
+            self._input_stream.get_execution_environment())  # type: Trigger
+
+    def get_input_type(self):
+        return _from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+    def trigger(self, trigger: Trigger):
+        """
+        Sets the Trigger that should be used to trigger window emission.
+        """
+        if isinstance(self._window_assigner, MergingWindowAssigner) \
+                and (trigger.can_merge() is not True):
+            raise TypeError("A merging window assigner cannot be used with a trigger that does "
+                            "not support merging.")
+
+        self._window_trigger = trigger
+        return self
+
+    def allowed_lateness(self, time_ms: int):
+        """
+        Sets the time by which elements are allowed to be late. Elements that arrive behind the
+        watermark by more than the specified time will be dropped. By default, the allowed lateness
+        is 0.
+
+        Setting an allowed lateness is only valid for event-time windows.
+        """
+        self._allowed_lateness = time_ms
+        return self
+
+    def apply(self,
+              window_function: AllWindowFunction,
+              output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function is called for each
+        evaluation of the window. The output of the window function is interpreted as a regular
+        non-windowed stream.
+
+        Note that this function requires that all data in the windows is buffered until the window
+        is evaluated, as the function provides no means of incremental aggregation.
+
+        :param window_function: The window function.
+        :param output_type: Type information for the result type of the window function.
+        :return: The data stream that is the result of applying the window function to the window.
+        """
+        internal_window_function = InternalIterableAllWindowFunction(
+            window_function)  # type: InternalWindowFunction
+        list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type())
+        return self._get_result_data_stream(internal_window_function,
+                                            list_state_descriptor,
+                                            output_type)
+
+    def process(self,
+                process_window_function: ProcessAllWindowFunction,
+                output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function is called for each
+        evaluation of the window for each key individually. The output of the window function is
+        interpreted as a regular non-windowed stream.
+
+        Note that this function requires that all data in the windows is buffered until the window
+        is evaluated, as the function provides no means of incremental aggregation.
+
+        :param process_window_function: The window function.
+        :param output_type: Type information for the result type of the window function.
+        :return: The data stream that is the result of applying the window function to the window.
+        """
+        internal_window_function = InternalIterableProcessAllWindowFunction(
+            process_window_function)  # type: InternalWindowFunction
+        list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type())
+        return self._get_result_data_stream(internal_window_function,
+                                            list_state_descriptor,
+                                            output_type)
+
+    def _get_result_data_stream(self,

Review Comment:
   It seems that the implementation is the same as WindowedStream._get_result_data_stream. Could we abstract it a bit to avoid duplication?



##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -38,6 +38,7 @@
     'ReduceFunction',
     'AggregateFunction',
     'KeySelector',
+    'NullByteKeySelector',

Review Comment:
   We don't need to expose it as it's just used internally.



##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -979,6 +1008,68 @@ def clear(self, context: 'ProcessWindowFunction.Context') -> None:
         pass
 
 
+class ProcessAllWindowFunction(Function, Generic[IN, OUT, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) windows using a context

Review Comment:
   ```suggestion
       Base interface for functions that are evaluated over non-keyed windows using a context
   ```



##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -979,6 +1008,68 @@ def clear(self, context: 'ProcessWindowFunction.Context') -> None:
         pass
 
 
+class ProcessAllWindowFunction(Function, Generic[IN, OUT, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) windows using a context
+    for retrieving extra information.
+    """
+
+    class Context(ABC, Generic[W2]):
+        """
+        The context holding window metadata.
+        """
+
+        @abstractmethod
+        def window(self) -> W2:
+            """
+            :return: The window that is being evaluated.
+            """
+            pass
+
+        @abstractmethod
+        def window_state(self) -> KeyedStateStore:
+            """
+            State accessor for per-key and per-window state.
+
+            .. note::
+                If you use per-window state you have to ensure that you clean it up by implementing
+                :func:`~ProcessWindowFunction.clear`.
+
+            :return: The :class:`KeyedStateStore` used to access per-key and per-window states.
+            """
+            pass
+
+        @abstractmethod
+        def global_state(self) -> KeyedStateStore:
+            """
+            State accessor for per-key global state.
+            """
+            pass
+
+    @abstractmethod
+    def process(self,
+                context: 'ProcessAllWindowFunction.Context',
+                elements: Iterable[IN]) -> Iterable[OUT]:
+        """
+        Evaluates the window and outputs none or several elements.
+
+        :param context: The context in which the window is being evaluated.
+        :param elements: The elements in the window being evaluated.
+        :return: The iterable object which produces the elements to emit.
+        """
+        pass
+
+    @abstractmethod

Review Comment:
   The `abstractmethod` could be removed as users don't have to override this method. See the Java ones for more details. It seems that the clear method in `ProcessWindowFunction` should also be updated.



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
             j_python_data_stream_function_operator))
 
 
+class AllWindowedStream(object):
+    """
+    A AllWindowedStream represents a data stream where the stream of elements is split into windows
+    based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+    If an Evictor is specified it will be used to evict elements from the window after evaluation
+    was triggered by the Trigger but before the actual evaluation of the window.
+    When using an evictor, window performance will degrade significantly, since pre-aggregation of
+    window results cannot be used.
+
+    Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream
+    will be collapsed together with the operation over the window into one single operation.
+    """
+
+    def __init__(self, data_stream: DataStream, window_assigner: WindowAssigner):
+        self._input_stream = data_stream.key_by(NullByteKeySelector())
+        self._window_assigner = window_assigner
+        self._allowed_lateness = 0
+        self._late_data_output_tag = None  # type: Optional[OutputTag]
+        self._window_trigger = window_assigner.get_default_trigger(
+            self._input_stream.get_execution_environment())  # type: Trigger
+
+    def get_input_type(self):
+        return _from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+    def trigger(self, trigger: Trigger):
+        """
+        Sets the Trigger that should be used to trigger window emission.
+        """
+        if isinstance(self._window_assigner, MergingWindowAssigner) \
+                and (trigger.can_merge() is not True):
+            raise TypeError("A merging window assigner cannot be used with a trigger that does "
+                            "not support merging.")
+
+        self._window_trigger = trigger
+        return self
+
+    def allowed_lateness(self, time_ms: int):
+        """
+        Sets the time by which elements are allowed to be late. Elements that arrive behind the
+        watermark by more than the specified time will be dropped. By default, the allowed lateness
+        is 0.
+
+        Setting an allowed lateness is only valid for event-time windows.
+        """
+        self._allowed_lateness = time_ms
+        return self
+
+    def apply(self,
+              window_function: AllWindowFunction,
+              output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function is called for each
+        evaluation of the window. The output of the window function is interpreted as a regular
+        non-windowed stream.
+
+        Note that this function requires that all data in the windows is buffered until the window
+        is evaluated, as the function provides no means of incremental aggregation.
+
+        :param window_function: The window function.
+        :param output_type: Type information for the result type of the window function.
+        :return: The data stream that is the result of applying the window function to the window.
+        """
+        internal_window_function = InternalIterableAllWindowFunction(
+            window_function)  # type: InternalWindowFunction
+        list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type())
+        return self._get_result_data_stream(internal_window_function,
+                                            list_state_descriptor,
+                                            output_type)
+
+    def process(self,

Review Comment:
   aggregate/reduce functions are missing.



##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -901,6 +914,22 @@ def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
         pass
 
 
+class AllWindowFunction(Function, Generic[IN, OUT, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) windows.

Review Comment:
   ```suggestion
       Base interface for functions that are evaluated over non-keyed windows.
   ```



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
             j_python_data_stream_function_operator))
 
 
+class AllWindowedStream(object):
+    """
+    A AllWindowedStream represents a data stream where the stream of elements is split into windows
+    based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+    If an Evictor is specified it will be used to evict elements from the window after evaluation
+    was triggered by the Trigger but before the actual evaluation of the window.
+    When using an evictor, window performance will degrade significantly, since pre-aggregation of
+    window results cannot be used.
+
+    Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream
+    will be collapsed together with the operation over the window into one single operation.
+    """
+
+    def __init__(self, data_stream: DataStream, window_assigner: WindowAssigner):
+        self._input_stream = data_stream.key_by(NullByteKeySelector())
+        self._window_assigner = window_assigner
+        self._allowed_lateness = 0
+        self._late_data_output_tag = None  # type: Optional[OutputTag]
+        self._window_trigger = window_assigner.get_default_trigger(
+            self._input_stream.get_execution_environment())  # type: Trigger
+
+    def get_input_type(self):
+        return _from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+    def trigger(self, trigger: Trigger):
+        """
+        Sets the Trigger that should be used to trigger window emission.
+        """
+        if isinstance(self._window_assigner, MergingWindowAssigner) \
+                and (trigger.can_merge() is not True):
+            raise TypeError("A merging window assigner cannot be used with a trigger that does "
+                            "not support merging.")
+
+        self._window_trigger = trigger
+        return self
+
+    def allowed_lateness(self, time_ms: int):
+        """
+        Sets the time by which elements are allowed to be late. Elements that arrive behind the
+        watermark by more than the specified time will be dropped. By default, the allowed lateness
+        is 0.
+
+        Setting an allowed lateness is only valid for event-time windows.
+        """
+        self._allowed_lateness = time_ms
+        return self
+
+    def apply(self,
+              window_function: AllWindowFunction,
+              output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function is called for each
+        evaluation of the window. The output of the window function is interpreted as a regular
+        non-windowed stream.
+
+        Note that this function requires that all data in the windows is buffered until the window
+        is evaluated, as the function provides no means of incremental aggregation.
+
+        :param window_function: The window function.
+        :param output_type: Type information for the result type of the window function.
+        :return: The data stream that is the result of applying the window function to the window.
+        """
+        internal_window_function = InternalIterableAllWindowFunction(
+            window_function)  # type: InternalWindowFunction
+        list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type())
+        return self._get_result_data_stream(internal_window_function,
+                                            list_state_descriptor,
+                                            output_type)
+
+    def process(self,
+                process_window_function: ProcessAllWindowFunction,
+                output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function is called for each
+        evaluation of the window for each key individually. The output of the window function is
+        interpreted as a regular non-windowed stream.
+
+        Note that this function requires that all data in the windows is buffered until the window
+        is evaluated, as the function provides no means of incremental aggregation.
+
+        :param process_window_function: The window function.
+        :param output_type: Type information for the result type of the window function.
+        :return: The data stream that is the result of applying the window function to the window.
+        """
+        internal_window_function = InternalIterableProcessAllWindowFunction(
+            process_window_function)  # type: InternalWindowFunction
+        list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type())
+        return self._get_result_data_stream(internal_window_function,
+                                            list_state_descriptor,
+                                            output_type)
+
+    def _get_result_data_stream(self,
+                                internal_window_function: InternalWindowFunction,
+                                window_state_descriptor: StateDescriptor,
+                                output_type: TypeInformation):
+        if self._window_trigger is None:
+            self._window_trigger = self._window_assigner.get_default_trigger(
+                self._input_stream.get_execution_environment())
+        window_serializer = self._window_assigner.get_window_serializer()
+        window_operation_descriptor = WindowOperationDescriptor(
+            self._window_assigner,
+            self._window_trigger,
+            self._allowed_lateness,
+            self._late_data_output_tag,
+            window_state_descriptor,
+            window_serializer,
+            internal_window_function)
+
+        from pyflink.fn_execution import flink_fn_execution_pb2
+        j_python_data_stream_function_operator, j_output_type_info = \
+            _get_one_input_stream_operator(
+                self._input_stream,
+                window_operation_descriptor,
+                flink_fn_execution_pb2.UserDefinedDataStreamFunction.WINDOW,  # type: ignore
+                output_type)
+
+        return DataStream(self._input_stream._j_data_stream.transform(
+            "WINDOW",

Review Comment:
   For the operator name, I suggest to keep it consistent with the Java API to use the class name of the window assigner instead of just using `WINDOW`.  Besides, we could also use the set_description to give more detailed description for the operator just as what is done in the Java WindowedStream.



##########
flink-python/pyflink/datastream/tests/test_window.py:
##########
@@ -441,6 +442,35 @@ def process(self, key, context: ProcessWindowFunction.Context,
         expected = ['(hi,2)', '(hi,2)', '(hi,2)']
         self.assert_equals_sorted(expected, results)
 
+    def test_event_time_tumbling_window_all(self):
+        data_stream = self.env.from_collection([
+            ('hi', 1), ('hello', 2), ('hi', 3), ('hello', 4), ('hello', 5), ('hi', 8), ('hi', 9),
+            ('hi', 15)],
+            type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: DataStream
+        watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+            .with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+        class CountAllWindowProcessFunction(ProcessAllWindowFunction[tuple, tuple, TimeWindow]):
+            def process(self, context: 'ProcessAllWindowFunction.Context',
+                        elements: Iterable[tuple]) -> Iterable[tuple]:
+                return [
+                    (context.window().start, context.window().end, len([e for e in elements]))]
+
+            def clear(self, context: 'ProcessAllWindowFunction.Context') -> None:
+                pass
+
+        data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+            .key_by(lambda x: x[0], key_type=Types.STRING()) \

Review Comment:
   window_all should applied to DataStream instead of KeyedStream and so please remove this line.



##########
docs/content.zh/docs/dev/datastream/operators/windows.md:
##########
@@ -1604,6 +1604,21 @@ val globalResults = resultsPerKey
     .process(new TopKWindowFunction())
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+input = ...  # type: DataStream
+
+input \
+    .key_by(<key selector>) \
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .reduce(Summer())
+
+input \

Review Comment:
   This is not consistent with the Java example. Please double check that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org