You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/04 07:38:56 UTC

[GitHub] [flink] Vancior commented on a change in pull request #18957: [FLINK-26444][python]Window allocator supporting pyflink datastream API

Vancior commented on a change in pull request #18957:
URL: https://github.com/apache/flink/pull/18957#discussion_r819330802



##########
File path: flink-python/pyflink/fn_execution/datastream/window/window_assigner.py
##########
@@ -0,0 +1,376 @@
+import math
+from typing import Iterable, Collection
+
+from pyflink.common import TypeSerializer, Time
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import Trigger
+from pyflink.datastream.state import ValueStateDescriptor, ValueState, ReducingStateDescriptor
+from pyflink.datastream.window import TimeWindow, CountWindow, WindowAssigner, T, TimeWindowSerializer, TriggerResult, \
+    CountWindowSerializer, MergingWindowAssigner
+from pyflink.fn_execution.table.window_context import W
+
+
+class EventTimeTrigger(Trigger[object, TimeWindow]):
+    """
+    A Trigger that fires once the watermark passes the end of the window to which a pane belongs.
+    """
+    def on_element(self,
+                   element: object,
+                   timestamp: int,
+                   window: TimeWindow,
+                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        if window.max_timestamp() <= ctx.get_current_watermark():
+            return TriggerResult.FIRE
+        else:
+            ctx.register_event_time_timer(window.max_timestamp())
+            # No action is taken on the window.
+            return TriggerResult.CONTINUE
+
+    def on_processing_time(self,
+                           time: int,
+                           window: TimeWindow,
+                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        # No action is taken on the window.
+        return TriggerResult.CONTINUE
+
+    def on_event_time(self,
+                      time: int,
+                      window: TimeWindow,
+                      ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        if time == window.max_timestamp():
+            return TriggerResult.FIRE
+        else:
+            # No action is taken on the window.
+            return TriggerResult.CONTINUE
+
+    def on_merge(self,
+                 window: TimeWindow,
+                 ctx: 'Trigger.OnMergeContext') -> None:
+        windowMaxTimestamp = window.max_timestamp()
+        if windowMaxTimestamp >= ctx.get_current_watermark():
+            ctx.register_event_time_timer(windowMaxTimestamp)
+
+    def clear(self,
+              window: TimeWindow,
+              ctx: 'Trigger.TriggerContext') -> None:
+        ctx.delete_event_time_timer(window.max_timestamp())
+
+
+class ProcessingTimeTrigger(Trigger[object, TimeWindow]):
+    """
+    A Trigger that fires once the current system time passes the end of the window to which a pane belongs.
+    """
+    def on_element(self,
+                   element: T,
+                   timestamp: int,
+                   window: W,
+                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        ctx.register_processing_time_timer(window.max_timestamp());
+        return TriggerResult.CONTINUE
+
+    def on_processing_time(self,
+                           time: int,
+                           window: W,
+                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.FIRE
+
+    def on_event_time(self,
+                      time: int,
+                      window: W,
+                      ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.CONTINUE
+
+    def on_merge(self,
+                 window: W,
+                 ctx: 'Trigger.OnMergeContext') -> None:
+        windowMaxTimestamp = window.max_timestamp();
+        if windowMaxTimestamp > ctx.get_current_processing_time():
+            ctx.register_processing_time_timer(windowMaxTimestamp)
+
+    def clear(self,
+              window: W,
+              ctx: 'Trigger.TriggerContext') -> None:
+        ctx.delete_processing_time_timer(window.max_timestamp());
+
+
+class CountTrigger(Trigger[object, CountWindow]):
+    """
+    A Trigger that fires once the count of elements in a pane reaches the given count.
+    """
+    def __init__(self, window_size: int):
+        self._window_size = int(window_size)
+        self._count_state_descriptor = ReducingStateDescriptor(
+            "trigger_counter", lambda a, b: a + b, Types.BIG_INT())
+
+    def on_element(self,
+                   element: object,
+                   timestamp: int,
+                   window: CountWindow,
+                   ctx: Trigger.TriggerContext) -> TriggerResult:
+        state = ctx.get_partitioned_state(self._count_state_descriptor)  # type: ReducingState
+        state.add(1)
+        if state.get() >= self._window_size:
+            # On FIRE, the window is evaluated and results are emitted. The window is not purged
+            #               though, all elements are retained.
+            return TriggerResult.FIRE
+        else:
+            # No action is taken on the window.
+            return TriggerResult.CONTINUE
+
+    def on_processing_time(self,
+                           time: int,
+                           window: CountWindow,
+                           ctx: Trigger.TriggerContext) -> TriggerResult:
+        # No action is taken on the window.
+        return TriggerResult.CONTINUE
+
+    def on_event_time(self,
+                      time: int,
+                      window: CountWindow,
+                      ctx: Trigger.TriggerContext) -> TriggerResult:
+        # No action is taken on the window.
+        return TriggerResult.CONTINUE
+
+    def on_merge(self, window: CountWindow, ctx: Trigger.OnMergeContext) -> None:
+        ctx.merge_partitioned_state(self._count_state_descriptor)
+
+    def clear(self, window: CountWindow, ctx: Trigger.TriggerContext) -> None:
+        state = ctx.get_partitioned_state(self._count_state_descriptor)
+        state.clear()
+
+
+class TumblingWindowAssigner(WindowAssigner[object, TimeWindow]):
+    """
+    A WindowAssigner that windows elements into windows based on the current system time of the machine the operation is running on. Windows cannot overlap.
+    For example, in order to window into windows of 1 minute, every 10 seconds:
+    ::
+            >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \
+            >>> .window(TumblingWindowAssigner(Time.minutes(1), Time.seconds(10), False))
+
+    A WindowAssigner that windows elements into windows based on the timestamp of the elements. Windows cannot overlap.
+    For example, in order to window into windows of 1 minute:
+     ::
+            >>> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+            >>> .key_by(lambda x: x[0], key_type=Types.STRING()) \
+            >>> .window(TumblingWindowAssigner(Time.minutes(1), Time.seconds(0), True))
+    """
+    def __init__(self, size: Time, offset: Time, is_event_time: bool):
+        self._size = size.to_milliseconds()
+        self._offset = offset.to_milliseconds()
+        self._is_event_time = is_event_time
+
+    def assign_windows(self,
+                       element: object,
+                       timestamp: int,
+                       context: WindowAssigner.WindowAssignerContext) -> Collection[TimeWindow]:
+        if self._is_event_time is False:
+            timestamp = context.get_current_processing_time()
+

Review comment:
       check `timestamp > Long.MIN_VALUE` under event-time mode as java does, or there'll be struct packing error when watermark strategy is not set for data stream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org