You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/29 17:53:00 UTC
[jira] [Commented] (BEAM-10617) python
CombineGlobally().with_fanout() cause duplicate combine results for sliding
windows
[ https://issues.apache.org/jira/browse/BEAM-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310846#comment-17310846 ]
Kenneth Knowles commented on BEAM-10617:
----------------------------------------
[~pabloem] [~robertwb] take a look?
> python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-10617
> URL: https://issues.apache.org/jira/browse/BEAM-10617
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow, runner-direct, sdk-py-core
> Reporter: Leiyi Zhang
> Priority: P1
>
> not only there are more than 1 result per window, results for each window got duplicated as well.
> here is some code I made to reproduce the issue, just run it with and without {{*.with_fanout*}}
> if running with Dataflow runner, add appropriate {{*gs://path/*}} in {{*WriteToText*}}
>
> {code:python}
> import apache_beam as beam
> from apache_beam.transforms import window
> from apache_beam.utils.timestamp import Timestamp
> class ListFn(beam.CombineFn):
> def create_accumulator(self):
> return []
> def add_input(self, mutable_accumulator, element):
> return mutable_accumulator + [element]
> def merge_accumulators(self, accumulators):
> res = []
> for accu in accumulators:
> res = res + accu
> return res
> def extract_output(self, accumulator):
> return accumulator
> p = beam.Pipeline()
> (
> p
> | beam.Create([
> window.TimestampedValue(1, Timestamp(seconds=1596216396)),
> window.TimestampedValue(2, Timestamp(seconds=1596216397)),
> window.TimestampedValue(3, Timestamp(seconds=1596216398)),
> window.TimestampedValue(4, Timestamp(seconds=1596216399)),
> window.TimestampedValue(5, Timestamp(seconds=1596216400)),
> window.TimestampedValue(6, Timestamp(seconds=1596216402)),
> window.TimestampedValue(7, Timestamp(seconds=1596216403)),
> window.TimestampedValue(8, Timestamp(seconds=1596216405))])
> | beam.WindowInto(window.SlidingWindows(10, 5))
> | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
> | beam.Map(repr)
> | beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1))
> p.run()
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)