You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/09/12 20:56:45 UTC
[beam] branch master updated: [Issue#23071] Fix AfterProcessingTime for Python to behave like Java (#23100)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f31b277c0ec [Issue#23071] Fix AfterProcessingTime for Python to behave like Java (#23100)
f31b277c0ec is described below
commit f31b277c0ece79c3112707040fa75f70991dce25
Author: IƱigo San Jose Visiers <42...@users.noreply.github.com>
AuthorDate: Mon Sep 12 15:56:38 2022 -0500
[Issue#23071] Fix AfterProcessingTime for Python to behave like Java (#23100)
* Fix AfterProcessingTime for Python. Add tests.
* Fix linter
* Change CountState to State for simplicity.
* Change CountState to State for simplicity.
---
sdks/python/apache_beam/transforms/trigger.py | 11 ++-
sdks/python/apache_beam/transforms/trigger_test.py | 83 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 3c7a0df5021..d9a3552a50a 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -377,6 +377,9 @@ class AfterProcessingTime(TriggerFn):
AfterProcessingTime is experimental. No backwards compatibility guarantees.
"""
+
+ STATE_TAG = _SetStateTag('has_timer')
+
def __init__(self, delay=0):
"""Initialize a processing time trigger with a delay in seconds."""
self.delay = delay
@@ -385,8 +388,10 @@ class AfterProcessingTime(TriggerFn):
return 'AfterProcessingTime(delay=%d)' % self.delay
def on_element(self, element, window, context):
- context.set_timer(
- '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
+ if not context.get_state(self.STATE_TAG):
+ context.set_timer(
+ '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
+ context.add_state(self.STATE_TAG, True)
def on_merge(self, to_be_merged, merge_result, context):
# timers will be kept through merging
@@ -400,7 +405,7 @@ class AfterProcessingTime(TriggerFn):
return True
def reset(self, window, context):
- pass
+ context.clear_state(self.STATE_TAG)
def may_lose_data(self, unused_windowing):
"""AfterProcessingTime may finish."""
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index d73168d1ba9..c8beed42c65 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -540,6 +540,89 @@ class RunnerApiTest(unittest.TestCase):
class TriggerPipelineTest(unittest.TestCase):
+ def test_after_processing_time(self):
+ test_options = PipelineOptions(
+ flags=['--allow_unsafe_triggers', '--streaming'])
+ with TestPipeline(options=test_options) as p:
+
+ total_elements_in_trigger = 4
+ processing_time_delay = 2
+ window_size = 10
+
+ # yapf: disable
+ test_stream = TestStream()
+ for i in range(total_elements_in_trigger):
+ (test_stream
+ .advance_processing_time(
+ processing_time_delay / total_elements_in_trigger)
+ .add_elements([('key', i)])
+ )
+
+ test_stream.advance_processing_time(processing_time_delay)
+
+ # Add dropped elements
+ (test_stream
+ .advance_processing_time(0.1)
+ .add_elements([('key', "dropped-1")])
+ .advance_processing_time(0.1)
+ .add_elements([('key', "dropped-2")])
+ )
+
+ (test_stream
+ .advance_processing_time(processing_time_delay)
+ .advance_watermark_to_infinity()
+ )
+ # yapf: enable
+
+ results = (
+ p
+ | test_stream
+ | beam.WindowInto(
+ FixedWindows(window_size),
+ trigger=AfterProcessingTime(processing_time_delay),
+ accumulation_mode=AccumulationMode.DISCARDING)
+ | beam.GroupByKey()
+ | beam.Map(lambda x: x[1]))
+
+ assert_that(results, equal_to([list(range(total_elements_in_trigger))]))
+
+ def test_repeatedly_after_processing_time(self):
+ test_options = PipelineOptions(flags=['--streaming'])
+ with TestPipeline(options=test_options) as p:
+ total_elements = 7
+ processing_time_delay = 2
+ window_size = 10
+ # yapf: disable
+ test_stream = TestStream()
+ for i in range(total_elements):
+ (test_stream
+ .advance_processing_time(processing_time_delay - 0.01)
+ .add_elements([('key', i)])
+ )
+
+ (test_stream
+ .advance_processing_time(processing_time_delay)
+ .advance_watermark_to_infinity()
+ )
+ # yapf: enable
+
+ results = (
+ p
+ | test_stream
+ | beam.WindowInto(
+ FixedWindows(window_size),
+ trigger=Repeatedly(AfterProcessingTime(processing_time_delay)),
+ accumulation_mode=AccumulationMode.DISCARDING)
+ | beam.GroupByKey()
+ | beam.Map(lambda x: x[1]))
+
+ expected = [[i, i + 1]
+ for i in range(total_elements - total_elements % 2)
+ if i % 2 == 0]
+ expected += [] if total_elements % 2 == 0 else [[total_elements - 1]]
+
+ assert_that(results, equal_to(expected))
+
def test_after_count(self):
test_options = PipelineOptions(flags=['--allow_unsafe_triggers'])
with TestPipeline(options=test_options) as p: