You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/09/30 17:08:02 UTC

[jira] [Commented] (BEAM-10617) python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows

    [ https://issues.apache.org/jira/browse/BEAM-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17204881#comment-17204881 ] 

Beam JIRA Bot commented on BEAM-10617:
--------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows
> ------------------------------------------------------------------------------------------
>
>                 Key: BEAM-10617
>                 URL: https://issues.apache.org/jira/browse/BEAM-10617
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, runner-direct, sdk-py-core
>            Reporter: Leiyi Zhang
>            Priority: P2
>              Labels: stale-P2
>
> not only there are more than 1 result per window, results for each window got duplicated as well.
> here is some code I made to reproduce the issue, just run it with and without {{*.with_fanout*}}
> if running with Dataflow runner, add appropriate {{*gs://path/*}} in {{*WriteToText*}}
>  
> {code:python}
> import apache_beam as beam
> from apache_beam.transforms import window
> from apache_beam.utils.timestamp import Timestamp
> class ListFn(beam.CombineFn):
>   def create_accumulator(self):
>     return []
>   def add_input(self, mutable_accumulator, element):
>     return mutable_accumulator + [element]
>   def merge_accumulators(self, accumulators):
>     res = []
>     for accu in accumulators:
>       res = res + accu
>     return res
>   def extract_output(self, accumulator):
>     return accumulator
> p = beam.Pipeline()
> (
>     p
>     | beam.Create([
>       window.TimestampedValue(1, Timestamp(seconds=1596216396)),
>       window.TimestampedValue(2, Timestamp(seconds=1596216397)),
>       window.TimestampedValue(3, Timestamp(seconds=1596216398)),
>       window.TimestampedValue(4, Timestamp(seconds=1596216399)),
>       window.TimestampedValue(5, Timestamp(seconds=1596216400)),
>       window.TimestampedValue(6, Timestamp(seconds=1596216402)),
>       window.TimestampedValue(7, Timestamp(seconds=1596216403)),
>       window.TimestampedValue(8, Timestamp(seconds=1596216405))])
>     | beam.WindowInto(window.SlidingWindows(10, 5))
>     | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
>     | beam.Map(repr)
>     | beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1))
> p.run()
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)