You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/11/04 18:19:27 UTC
[beam] branch master updated: Fix
GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a2f22df Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming
new d535e6d Merge pull request #13253 from nehsyc/gib_timeout_py
a2f22df is described below
commit a2f22dfd2c72a9ca152a8d44907fc7344361b499
Author: sychen <sy...@google.com>
AuthorDate: Tue Nov 3 15:00:26 2020 -0800
Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming
---
sdks/python/apache_beam/transforms/util_test.py | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index cbca2a1..49598f7 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -675,13 +675,14 @@ class GroupIntoBatchesTest(unittest.TestCase):
max_buffering_duration_secs = 100
start_time = timestamp.Timestamp(0)
- test_stream = TestStream().add_elements(
- [TimestampedValue(value, start_time + i)
- for i, value in enumerate(GroupIntoBatchesTest._create_test_data())]) \
- .advance_watermark_to(
- start_time + GroupIntoBatchesTest.NUM_ELEMENTS + 1) \
- .advance_processing_time(100) \
- .advance_watermark_to_infinity()
+ test_stream = (
+ TestStream().add_elements([
+ TimestampedValue(value, start_time + i) for i,
+ value in enumerate(GroupIntoBatchesTest._create_test_data())
+ ]).advance_processing_time(150).advance_watermark_to(
+ start_time + window_duration).advance_watermark_to(
+ start_time + window_duration +
+ 1).advance_watermark_to_infinity())
with TestPipeline(options=StandardOptions(streaming=True)) as pipeline:
# To trigger the processing time timer, use a fake clock with start time
@@ -704,10 +705,10 @@ class GroupIntoBatchesTest(unittest.TestCase):
# should be 5 (flush because of batch size reached).
expected_0 = 5
# There is only one element left in the window so batch size
- # should be 1 (flush because of end of window reached).
+ # should be 1 (flush because of max buffering duration reached).
expected_1 = 1
# Collection has 10 elements, there are only 4 left, so batch size should
- # be 4 (flush because of max buffering duration reached).
+ # be 4 (flush because of end of window reached).
expected_2 = 4
assert_that(
num_elements_per_batch,