You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/13 02:28:38 UTC

[GitHub] [flink] dianfu commented on a diff in pull request #19421: [FLINK-27168][API/Python]Add continuousprocessingtimetrigger and continuouseventtimetrigger triggers

dianfu commented on code in PR #19421:
URL: https://github.com/apache/flink/pull/19421#discussion_r848998698


##########
flink-python/pyflink/datastream/window.py:
##########
@@ -48,6 +48,8 @@
            'Trigger',
            'EventTimeTrigger',
            'ProcessingTimeTrigger',
+           'ContinuousEventTimeTrigger',
+           'CountTumblingWindowAssigner',

Review Comment:
   ```suggestion
              'ContinuousProcessingTimeTrigger',
   ```



##########
flink-python/pyflink/datastream/window.py:
##########
@@ -635,6 +637,90 @@ def clear(self,
         ctx.delete_event_time_timer(window.max_timestamp())
 
 
+class ContinuousEventTimeTrigger(Trigger[T, TimeWindow]):
+    """
+    A Trigger that continuously fires based on a given time interval. This fires based Watermarks.
+    """
+
+    def __init__(self, interval: int):
+        self.interval = interval
+        self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG())
+        self.fire_timestamp_state = None
+
+    @staticmethod
+    def of(interval: Time) -> 'ContinuousEventTimeTrigger':
+        return ContinuousEventTimeTrigger(interval.to_milliseconds())
+
+    def on_element(self, element: T,

Review Comment:
   ```suggestion
       def on_element(self,
                      element: T,
   ```



##########
flink-python/pyflink/datastream/window.py:
##########
@@ -677,6 +763,80 @@ def clear(self,
         ctx.delete_processing_time_timer(window.max_timestamp())
 
 
+class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]):
+    """
+    A Trigger that continuously fires based on a given time interval as measured by the clock of the
+    machine on which the job is running.
+    Type parameters:<W> – The type of Windows on which this trigger can operate.

Review Comment:
   ```suggestion
   ```



##########
flink-python/pyflink/datastream/window.py:
##########
@@ -677,6 +763,80 @@ def clear(self,
         ctx.delete_processing_time_timer(window.max_timestamp())
 
 
+class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]):
+    """
+    A Trigger that continuously fires based on a given time interval as measured by the clock of the
+    machine on which the job is running.
+    Type parameters:<W> – The type of Windows on which this trigger can operate.
+    """
+
+    def __init__(self, interval: int):
+        self.interval = interval
+        self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG())
+        self.fire_timestamp_state = None
+
+    @staticmethod
+    def of(interval: Time) -> 'ContinuousEventTimeTrigger':
+        return ContinuousEventTimeTrigger(interval.to_milliseconds())

Review Comment:
   ```suggestion
           return ContinuousProcessingTimeTrigger(interval.to_milliseconds())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org