You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/09/12 20:56:45 UTC

[beam] branch master updated: [Issue#23071] Fix AfterProcessingTime for Python to behave like Java (#23100)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f31b277c0ec [Issue#23071] Fix AfterProcessingTime for Python to behave like Java (#23100)
f31b277c0ec is described below

commit f31b277c0ece79c3112707040fa75f70991dce25
Author: IƱigo San Jose Visiers <42...@users.noreply.github.com>
AuthorDate: Mon Sep 12 15:56:38 2022 -0500

    [Issue#23071] Fix AfterProcessingTime for Python to behave like Java (#23100)
    
    * Fix AfterProcessingTime for Python. Add tests.
    
    * Fix linter
    
    * Change CountState to State for simplicity.
    
    * Change CountState to State for simplicity.
---
 sdks/python/apache_beam/transforms/trigger.py      | 11 ++-
 sdks/python/apache_beam/transforms/trigger_test.py | 83 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 3c7a0df5021..d9a3552a50a 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -377,6 +377,9 @@ class AfterProcessingTime(TriggerFn):
 
   AfterProcessingTime is experimental. No backwards compatibility guarantees.
   """
+
+  STATE_TAG = _SetStateTag('has_timer')
+
   def __init__(self, delay=0):
     """Initialize a processing time trigger with a delay in seconds."""
     self.delay = delay
@@ -385,8 +388,10 @@ class AfterProcessingTime(TriggerFn):
     return 'AfterProcessingTime(delay=%d)' % self.delay
 
   def on_element(self, element, window, context):
-    context.set_timer(
-        '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
+    if not context.get_state(self.STATE_TAG):
+      context.set_timer(
+          '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
+    context.add_state(self.STATE_TAG, True)
 
   def on_merge(self, to_be_merged, merge_result, context):
     # timers will be kept through merging
@@ -400,7 +405,7 @@ class AfterProcessingTime(TriggerFn):
     return True
 
   def reset(self, window, context):
-    pass
+    context.clear_state(self.STATE_TAG)
 
   def may_lose_data(self, unused_windowing):
     """AfterProcessingTime may finish."""
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index d73168d1ba9..c8beed42c65 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -540,6 +540,89 @@ class RunnerApiTest(unittest.TestCase):
 
 
 class TriggerPipelineTest(unittest.TestCase):
+  def test_after_processing_time(self):
+    test_options = PipelineOptions(
+        flags=['--allow_unsafe_triggers', '--streaming'])
+    with TestPipeline(options=test_options) as p:
+
+      total_elements_in_trigger = 4
+      processing_time_delay = 2
+      window_size = 10
+
+      # yapf: disable
+      test_stream = TestStream()
+      for i in range(total_elements_in_trigger):
+        (test_stream
+         .advance_processing_time(
+            processing_time_delay / total_elements_in_trigger)
+         .add_elements([('key', i)])
+         )
+
+      test_stream.advance_processing_time(processing_time_delay)
+
+      # Add dropped elements
+      (test_stream
+         .advance_processing_time(0.1)
+         .add_elements([('key', "dropped-1")])
+         .advance_processing_time(0.1)
+         .add_elements([('key', "dropped-2")])
+      )
+
+      (test_stream
+       .advance_processing_time(processing_time_delay)
+       .advance_watermark_to_infinity()
+       )
+      # yapf: enable
+
+      results = (
+          p
+          | test_stream
+          | beam.WindowInto(
+              FixedWindows(window_size),
+              trigger=AfterProcessingTime(processing_time_delay),
+              accumulation_mode=AccumulationMode.DISCARDING)
+          | beam.GroupByKey()
+          | beam.Map(lambda x: x[1]))
+
+      assert_that(results, equal_to([list(range(total_elements_in_trigger))]))
+
+  def test_repeatedly_after_processing_time(self):
+    test_options = PipelineOptions(flags=['--streaming'])
+    with TestPipeline(options=test_options) as p:
+      total_elements = 7
+      processing_time_delay = 2
+      window_size = 10
+      # yapf: disable
+      test_stream = TestStream()
+      for i in range(total_elements):
+        (test_stream
+         .advance_processing_time(processing_time_delay - 0.01)
+         .add_elements([('key', i)])
+         )
+
+      (test_stream
+       .advance_processing_time(processing_time_delay)
+       .advance_watermark_to_infinity()
+       )
+      # yapf: enable
+
+      results = (
+          p
+          | test_stream
+          | beam.WindowInto(
+              FixedWindows(window_size),
+              trigger=Repeatedly(AfterProcessingTime(processing_time_delay)),
+              accumulation_mode=AccumulationMode.DISCARDING)
+          | beam.GroupByKey()
+          | beam.Map(lambda x: x[1]))
+
+      expected = [[i, i + 1]
+                  for i in range(total_elements - total_elements % 2)
+                  if i % 2 == 0]
+      expected += [] if total_elements % 2 == 0 else [[total_elements - 1]]
+
+      assert_that(results, equal_to(expected))
+
   def test_after_count(self):
     test_options = PipelineOptions(flags=['--allow_unsafe_triggers'])
     with TestPipeline(options=test_options) as p: