You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Parviz deyhim <de...@gmail.com> on 2019/01/22 05:06:20 UTC
Global Window and cumulative trigger
Hi,
Trying something simple here: Global view of all GroupByKey values
. Looking to get a cumulative GroupBy of a field and I like my Window
trigger to fire with all the values seen so far. However what I get is seem
to be new values. Basically almost feels like what I should expect from
discardingFiredPanes. Am I missing something?
PCollection<KV<String,Double>> pubSubMessages = pipeline
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic("xxxxxxx"))
.apply("TransformToEvent", ParDo.of(new EmitEvent()))
.apply("GetV1",ParDo.of(new ExtractV1Field()))
.apply("Window",Window.<KV<String,Double>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes().withAllowedLateness(Duration.standardDays(30)))
.apply("SUM",Sum.doublesPerKey());
Thanks,
Parviz
Re: Global Window and cumulative trigger
Posted by Kenneth Knowles <ke...@apache.org>.
Hi Parviz,
Your code looks good. You are correct about the meaning of
accumulatingFiredPanes(). You should always see the whole sum so far for
each key. Since your code is so clear, I would immediately move to porting
this example to a test case. Can you provide inputs and outputs so we can
reproduce?
Kenn
On Mon, Jan 21, 2019 at 9:06 PM Parviz deyhim <de...@gmail.com> wrote:
> Hi,
>
> Trying something simple here: Global view of all GroupByKey values
>
> . Looking to get a cumulative GroupBy of a field and I like my Window
> trigger to fire with all the values seen so far. However what I get is seem
> to be new values. Basically almost feels like what I should expect from
> discardingFiredPanes. Am I missing something?
>
> PCollection<KV<String,Double>> pubSubMessages = pipeline
> .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic("xxxxxxx"))
> .apply("TransformToEvent", ParDo.of(new EmitEvent()))
> .apply("GetV1",ParDo.of(new ExtractV1Field()))
> .apply("Window",Window.<KV<String,Double>>into(new GlobalWindows())
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
> .accumulatingFiredPanes().withAllowedLateness(Duration.standardDays(30)))
> .apply("SUM",Sum.doublesPerKey());
>
>
> Thanks,
>
> Parviz
>
>