You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2020/03/07 00:54:30 UTC
[beam] branch master updated: [BEAM-9465] Fire repeatedly in
reshuffle
This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0a2cee9 [BEAM-9465] Fire repeatedly in reshuffle
new 23165a6 Merge pull request #11066 from angoenka/fix_reshuffle
0a2cee9 is described below
commit 0a2cee9415a7ecfb6bf747537d6a1ec6add70963
Author: Ankur Goenka <an...@gmail.com>
AuthorDate: Fri Mar 6 14:14:43 2020 -0800
[BEAM-9465] Fire repeatedly in reshuffle
---
sdks/python/apache_beam/transforms/util.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 309aba5..369e884 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -64,6 +64,7 @@ from apache_beam.transforms.ptransform import ptransform_fn
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterCount
+from apache_beam.transforms.trigger import Repeatedly
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.userstate import CombiningValueStateSpec
from apache_beam.transforms.userstate import TimerSpec
@@ -678,7 +679,7 @@ class ReshufflePerKey(PTransform):
# accept only standard coders.
ungrouped._windowing = Windowing(
window.GlobalWindows(),
- triggerfn=AfterCount(1),
+ triggerfn=Repeatedly(AfterCount(1)),
accumulation_mode=AccumulationMode.DISCARDING,
timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
result = (