You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Parviz deyhim <de...@gmail.com> on 2019/01/22 05:06:20 UTC

Global Window and cumulative trigger

Hi,

Trying something simple here: Global view of all GroupByKey values

. Looking to get a cumulative GroupBy of a field and I like my Window
trigger to fire with all the values seen so far. However what I get is seem
to be new values. Basically almost feels like what I should expect from
discardingFiredPanes. Am I missing something?

PCollection<KV<String,Double>> pubSubMessages = pipeline
        .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic("xxxxxxx"))
        .apply("TransformToEvent", ParDo.of(new EmitEvent()))
        .apply("GetV1",ParDo.of(new ExtractV1Field()))
        .apply("Window",Window.<KV<String,Double>>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))

.accumulatingFiredPanes().withAllowedLateness(Duration.standardDays(30)))
        .apply("SUM",Sum.doublesPerKey());


Thanks,

Parviz

Re: Global Window and cumulative trigger

Posted by Kenneth Knowles <ke...@apache.org>.
Hi Parviz,

Your code looks good. You are correct about the meaning of
accumulatingFiredPanes(). You should always see the whole sum so far for
each key. Since your code is so clear, I would immediately move to porting
this example to a test case. Can you provide inputs and outputs so we can
reproduce?

Kenn

On Mon, Jan 21, 2019 at 9:06 PM Parviz deyhim <de...@gmail.com> wrote:

> Hi,
>
> Trying something simple here: Global view of all GroupByKey values
>
> . Looking to get a cumulative GroupBy of a field and I like my Window
> trigger to fire with all the values seen so far. However what I get is seem
> to be new values. Basically almost feels like what I should expect from
> discardingFiredPanes. Am I missing something?
>
> PCollection<KV<String,Double>> pubSubMessages = pipeline
>         .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic("xxxxxxx"))
>         .apply("TransformToEvent", ParDo.of(new EmitEvent()))
>         .apply("GetV1",ParDo.of(new ExtractV1Field()))
>         .apply("Window",Window.<KV<String,Double>>into(new GlobalWindows())
>                 .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>                 .accumulatingFiredPanes().withAllowedLateness(Duration.standardDays(30)))
>         .apply("SUM",Sum.doublesPerKey());
>
>
> Thanks,
>
> Parviz
>
>