You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "wendy liu (Jira)" <ji...@apache.org> on 2019/11/13 23:42:00 UTC

[jira] [Updated] (BEAM-8657) Not doing Combiner lifting for data-driven triggers

     [ https://issues.apache.org/jira/browse/BEAM-8657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

wendy liu updated BEAM-8657:
----------------------------
    Description: 
The test below will always produce empty output, due to combiner lifting, which already combines all input values of one shard into one before grouping. To fix this, we shall not do Combiner lifting for the data-driven triggers.

l = [window.TimestampedValue(('a', 1), 1),
 window.TimestampedValue(('b', 3), 3),
 window.TimestampedValue(('a', 2), 2),
 window.TimestampedValue(('a', 5), 5),]

result = (p
|Map(lambda x : x)|
|'window' >> beam.WindowInto(
 FixedWindows(6),
 trigger=trigger.AfterCount(2),
 accumulation_mode=trigger.AccumulationMode.DISCARDING)|
|beam.CombinePerKey(combine.Largest(1)))|

  was:
The test below will always produce empty output, due to combiner lifting, which already combines all input values of one shard into one before grouping. To fix this, we shall not do Combiner lifting for the data-driven triggers.

l = [window.TimestampedValue(('a', 1), 1),
 window.TimestampedValue(('b', 3), 3),
 window.TimestampedValue(('a', 2), 2),
 window.TimestampedValue(('a', 5), 5),]

result = (p
 | beam.Create(l)
 | Map(lambda x : x)
 | 'window' >> beam.WindowInto(
 FixedWindows(6),
 trigger=trigger.AfterCount(2),
 accumulation_mode=trigger.AccumulationMode.DISCARDING)

| beam.CombinePerKey(combine.Largest(1)))


> Not doing Combiner lifting for data-driven triggers
> ---------------------------------------------------
>
>                 Key: BEAM-8657
>                 URL: https://issues.apache.org/jira/browse/BEAM-8657
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: wendy liu
>            Priority: Major
>
> The test below will always produce empty output, due to combiner lifting, which already combines all input values of one shard into one before grouping. To fix this, we shall not do Combiner lifting for the data-driven triggers.
> l = [window.TimestampedValue(('a', 1), 1),
>  window.TimestampedValue(('b', 3), 3),
>  window.TimestampedValue(('a', 2), 2),
>  window.TimestampedValue(('a', 5), 5),]
> result = (p
> |Map(lambda x : x)|
> |'window' >> beam.WindowInto(
>  FixedWindows(6),
>  trigger=trigger.AfterCount(2),
>  accumulation_mode=trigger.AccumulationMode.DISCARDING)|
> |beam.CombinePerKey(combine.Largest(1)))|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)