You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/11/04 18:19:27 UTC

[beam] branch master updated: Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a2f22df  Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming
     new d535e6d  Merge pull request #13253 from nehsyc/gib_timeout_py
a2f22df is described below

commit a2f22dfd2c72a9ca152a8d44907fc7344361b499
Author: sychen <sy...@google.com>
AuthorDate: Tue Nov 3 15:00:26 2020 -0800

    Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming
---
 sdks/python/apache_beam/transforms/util_test.py | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index cbca2a1..49598f7 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -675,13 +675,14 @@ class GroupIntoBatchesTest(unittest.TestCase):
     max_buffering_duration_secs = 100
 
     start_time = timestamp.Timestamp(0)
-    test_stream = TestStream().add_elements(
-        [TimestampedValue(value, start_time + i)
-         for i, value in enumerate(GroupIntoBatchesTest._create_test_data())]) \
-      .advance_watermark_to(
-        start_time + GroupIntoBatchesTest.NUM_ELEMENTS + 1) \
-      .advance_processing_time(100) \
-      .advance_watermark_to_infinity()
+    test_stream = (
+        TestStream().add_elements([
+            TimestampedValue(value, start_time + i) for i,
+            value in enumerate(GroupIntoBatchesTest._create_test_data())
+        ]).advance_processing_time(150).advance_watermark_to(
+            start_time + window_duration).advance_watermark_to(
+                start_time + window_duration +
+                1).advance_watermark_to_infinity())
 
     with TestPipeline(options=StandardOptions(streaming=True)) as pipeline:
       # To trigger the processing time timer, use a fake clock with start time
@@ -704,10 +705,10 @@ class GroupIntoBatchesTest(unittest.TestCase):
       # should be 5 (flush because of batch size reached).
       expected_0 = 5
       # There is only one element left in the window so batch size
-      # should be 1 (flush because of end of window reached).
+      # should be 1 (flush because of max buffering duration reached).
       expected_1 = 1
       # Collection has 10 elements, there are only 4 left, so batch size should
-      # be 4 (flush because of max buffering duration reached).
+      # be 4 (flush because of end of window reached).
       expected_2 = 4
       assert_that(
           num_elements_per_batch,