You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Yi Hu (Jira)" <ji...@apache.org> on 2022/05/21 22:45:00 UTC
[jira] [Comment Edited] (BEAM-14497) Python Reshuffle holds elements
[ https://issues.apache.org/jira/browse/BEAM-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17540510#comment-17540510 ]
Yi Hu edited comment on BEAM-14497 at 5/21/22 10:44 PM:
--------------------------------------------------------
From testing found that the Always() trigger used in Reshuffle actually does not working. Using a Always() trigger still cause GroupByKey() to group elements. In Java ReShuffle, there is a ReshuffleTrigger, tested that using it with GroupByKey does not group elements.
{code:python}
def test(p: Pipeline):
result = (p
| beam.Create([('A', 1), ('B', 1), ('C', 1), ('A', 2)])
| 'Timestamped' >> beam.Map(lambda x: TimestampedValue(x, timestamp=time.time()))
| WindowInto(GlobalWindows(), trigger=Always(),
accumulation_mode=AccumulationMode.DISCARDING,
timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST,
allowed_lateness=1000)
| GroupByKey()
| beam.Map(print)
)
{code}
was (Author: JIRAUSER286259):
From testing found that the Always() trigger actually does not working. Using a Always() trigger still cause GroupByKey() to group elements. In Java ReShuffle, there is a ReshuffleTrigger, tested that using it with GroupByKey does not group elements.
{code:python}
def test(p: Pipeline):
result = (p
| beam.Create([('A', 1), ('B', 1), ('C', 1), ('A', 2)])
| 'Timestamped' >> beam.Map(lambda x: TimestampedValue(x, timestamp=time.time()))
| WindowInto(GlobalWindows(), trigger=Always(),
accumulation_mode=AccumulationMode.DISCARDING,
timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST,
allowed_lateness=1000)
| GroupByKey()
| beam.Map(print)
)
{code}
> Python Reshuffle holds elements
> -------------------------------
>
> Key: BEAM-14497
> URL: https://issues.apache.org/jira/browse/BEAM-14497
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Yi Hu
> Priority: P2
>
> Python Reshuffle holds elements when pipeline is running, and likely release them in a batch. In contrast, Java Reshuffle triggers on every element as noted in its documentation
> "the trigger used with {@link Reshuffle} which triggers on every element and never buffers
> * state."
> Here is a working example:
> {code:python}
> def test(p: Pipeline):
> class SlowProcessFn(beam.DoFn):
> def process(self, element):
> time.sleep(0.5)
> yield element
> result = (p
> | beam.Create(range(100))
> | beam.ParDo(SlowProcessFn())
> | beam.Reshuffle() # HERE
> | beam.Map(lambda x: print(x, time.time())))
> return result
> {code}
> Tested on local runner and flink runner (1.14), the elements are printed after 50 secs. If commenting out Reshuffle, every half second an element gets printed.
> This behavior introduces issue when downstream PTransform involves some kind of time-sensitive operation, like receiving a list of updated files from input and read them done by filebasedsource.ReadAllFiles transform. Because there is a Reshuffle in ReadAll, the actual read will be blocked.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)