You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2020/03/07 00:54:30 UTC

[beam] branch master updated: [BEAM-9465] Fire repeatedly in reshuffle

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a2cee9  [BEAM-9465] Fire repeatedly in reshuffle
     new 23165a6  Merge pull request #11066 from angoenka/fix_reshuffle
0a2cee9 is described below

commit 0a2cee9415a7ecfb6bf747537d6a1ec6add70963
Author: Ankur Goenka <an...@gmail.com>
AuthorDate: Fri Mar 6 14:14:43 2020 -0800

    [BEAM-9465] Fire repeatedly in reshuffle
---
 sdks/python/apache_beam/transforms/util.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 309aba5..369e884 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -64,6 +64,7 @@ from apache_beam.transforms.ptransform import ptransform_fn
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.trigger import AccumulationMode
 from apache_beam.transforms.trigger import AfterCount
+from apache_beam.transforms.trigger import Repeatedly
 from apache_beam.transforms.userstate import BagStateSpec
 from apache_beam.transforms.userstate import CombiningValueStateSpec
 from apache_beam.transforms.userstate import TimerSpec
@@ -678,7 +679,7 @@ class ReshufflePerKey(PTransform):
     # accept only standard coders.
     ungrouped._windowing = Windowing(
         window.GlobalWindows(),
-        triggerfn=AfterCount(1),
+        triggerfn=Repeatedly(AfterCount(1)),
         accumulation_mode=AccumulationMode.DISCARDING,
         timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
     result = (