You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Luis Mariano Guerra <ma...@event-fabric.com> on 2016/10/27 14:31:49 UTC

emit partial state in window (streaming)

hi,

 I need to calculate some counts for the day but also emit the partial
counts periodically, I think triggers may help me, I'm searching around but
there's not much content about it, any tip?

for example I'm counting access by location to different services, I want
to accumulate access during the whole day, but I want to emit the partial
count every 5 minutes.

one slightly related question, is there a way to align a window to a day?
for example a 24 hour window that starts at 00:00.

thanks.

Re: emit partial state in window (streaming)

Posted by Manu Zhang <ow...@gmail.com>.
Hi Luis,

You may try ContinuousEventTimeTrigger
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java>
 that continuously fire on given time interval instead of writing your own.
Note that we recently fixed a bug for this trigger so I think only the
trunk version is working.

Cheers,
Manu

On Thu, Nov 3, 2016 at 9:07 PM Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Luis,
>
> Can you try to comment the whole final windowing and see if this is works?
> This includes the following lines:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit,
> windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
>
> An additional note is that I would go for registering an event time timer
> at the onEventTime
> instead of checking the timestamp on the onElement(). This is because with
> your implementation,
> in order to fire a computation, you always have to wait for an element
> outside the partial window interval to arrive.
>
> Cheers,
> Kostas
>
> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <ma...@event-fabric.com>
> wrote:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>                 //.trigger(new PartialWindowTrigger<>(partialWindowTime,
> timeUnit, windowTime, timeUnit))
>                 .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
>

Re: emit partial state in window (streaming)

Posted by Luis Mariano Guerra <ma...@event-fabric.com>.
On Thu, Nov 3, 2016 at 7:05 PM, Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Luis,
>
> You cannot have event-time early firings on both chained window operators.
> The reason is that each early result from the first window operator will
> have a timestamp equal to window.maxTimestamp-1.
> So in the second windowing operator, they will be buffered until the
> watermark signaling the end of the window arrives.
>

so, how do I get windowAll and partial results? do I have to remove the
partial calculations and do it all in one node/thread? is there another way?


>
> Now for the second point, I think that what you have understood is correct.
> The "ctx.registerEventTimeTimer(window.getEnd())” registers a timer to
> call the onEventTime().
>

then what else calls onEventTime? because if a register the event time
timer inside of it something else calls it.



>
> Cheers,
> Kostas
>
>
> > On Nov 3, 2016, at 3:29 PM, Luis Mariano Guerra <
> mariano@event-fabric.com> wrote:
> >
> > On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <
> k.kloudas@data-artisans.com> wrote:
> > Hi Luis,
> >
> > Can you try to comment the whole final windowing and see if this is
> works?
> > This includes the following lines:
> >
> >   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
> >   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit,
> windowTime, timeUnit))
> >   .apply(creator.create(), windowAllFold, windowAllMerge);
> >
> >
> > commenting it emits on fire, how do I make the trigger "go thorough" the
> windowAll, or if not possible, how can I join the substreams in one stream
> and respect the trigger?
> >
> > An additional note is that I would go for registering an event time
> timer at the onEventTime
> > instead of checking the timestamp on the onElement(). This is because
> with your implementation,
> > in order to fire a computation, you always have to wait for an element
> outside the partial window interval to arrive.
> >
> > then I think I understood the purpose of registering the event time
> timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called
> to register a timer to call onEventTime?
> >
> >
> > Cheers,
> > Kostas
> >
> >> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <
> mariano@event-fabric.com> wrote:
> >>
> >>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime,
> timeUnit)))
> >>                 //.trigger(new PartialWindowTrigger<>(partialWindowTime,
> timeUnit, windowTime, timeUnit))
> >>                 .apply(creator.create(), windowAllFold, windowAllMerge);
> >
> >
>
>

Re: emit partial state in window (streaming)

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Luis,

You cannot have event-time early firings on both chained window operators. 
The reason is that each early result from the first window operator will have a timestamp equal to window.maxTimestamp-1.
So in the second windowing operator, they will be buffered until the watermark signaling the end of the window arrives.

Now for the second point, I think that what you have understood is correct. 
The "ctx.registerEventTimeTimer(window.getEnd())” registers a timer to call the onEventTime().

Cheers,
Kostas


> On Nov 3, 2016, at 3:29 PM, Luis Mariano Guerra <ma...@event-fabric.com> wrote:
> 
> On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <k....@data-artisans.com> wrote:
> Hi Luis,
> 
> Can you try to comment the whole final windowing and see if this is works? 
> This includes the following lines:
> 
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
> 
> 
> commenting it emits on fire, how do I make the trigger "go thorough" the windowAll, or if not possible, how can I join the substreams in one stream and respect the trigger?
>  
> An additional note is that I would go for registering an event time timer at the onEventTime 
> instead of checking the timestamp on the onElement(). This is because with your implementation,
> in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive.
> 
> then I think I understood the purpose of registering the event time timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to register a timer to call onEventTime?
>  
> 
> Cheers,
> Kostas
> 
>> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <ma...@event-fabric.com> wrote:
>> 
>>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>>                 //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
>>                 .apply(creator.create(), windowAllFold, windowAllMerge);
> 
> 


Re: emit partial state in window (streaming)

Posted by Luis Mariano Guerra <ma...@event-fabric.com>.
On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Luis,
>
> Can you try to comment the whole final windowing and see if this is works?
> This includes the following lines:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit,
> windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
commenting it emits on fire, how do I make the trigger "go thorough" the
windowAll, or if not possible, how can I join the substreams in one stream
and respect the trigger?


> An additional note is that I would go for registering an event time timer
> at the onEventTime
> instead of checking the timestamp on the onElement(). This is because with
> your implementation,
> in order to fire a computation, you always have to wait for an element
> outside the partial window interval to arrive.
>

then I think I understood the purpose of registering the event time timer
wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to
register a timer to call onEventTime?


>
> Cheers,
> Kostas
>
> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <ma...@event-fabric.com>
> wrote:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>                 //.trigger(new PartialWindowTrigger<>(partialWindowTime,
> timeUnit, windowTime, timeUnit))
>                 .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
>

Re: emit partial state in window (streaming)

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Luis,

Can you try to comment the whole final windowing and see if this is works? 
This includes the following lines:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
  .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
  .apply(creator.create(), windowAllFold, windowAllMerge);

An additional note is that I would go for registering an event time timer at the onEventTime 
instead of checking the timestamp on the onElement(). This is because with your implementation,
in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive.

Cheers,
Kostas

> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <ma...@event-fabric.com> wrote:
> 
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>                 //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
>                 .apply(creator.create(), windowAllFold, windowAllMerge);


Re: emit partial state in window (streaming)

Posted by Luis Mariano Guerra <ma...@event-fabric.com>.
On Thu, Oct 27, 2016 at 4:37 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Luis,
>
> these blogposts should help you with the periodic partial result trigger
> [1] [2].
>

Hi, thanks for the links, I read them and tried to implement what I need,
everything seems to work as expected except for the fact that the partial
results aren't emitted, I created a gist with my PartialWindowTrigger
implementation and the relevant part of the job:

https://gist.github.com/anonymous/041987821e37ee8f862ce1857bb074ea

Is the problem in the trigger?
do I have to create a window assigner too?
is it because of the windowAll?

I reproduce part of the readme from the gist here for convenience, please
see the readme for the PartialWindowTrigger implementation and the rest of
the logs:

What the job does (or I think it does) is to:

   - KeyBy the first string of a field
   - Create a tumbling window of 10 seconds
   - Register my PartialWindowTrigger that will trigger every 2 seconds
   (FIRE) and after the 10 second window (FIRE_AND_PURGE)
   - Fold on each partition to create a partial accumulation
   - Join all the partial results into a unique place through windowAll
   - Aggregate the partial aggregations into one result

Here is the job's relevant part:

            return input.keyBy(keySelector)
                .timeWindow(Time.of(windowTime, timeUnit))
                .trigger(new PartialWindowTrigger<>(partialWindowTime,
timeUnit, windowTime, timeUnit))
                .apply(creator.create(), timeWindowFold, timeWindowMerge)
                .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime,
timeUnit)))
                //.trigger(new PartialWindowTrigger<>(partialWindowTime,
timeUnit, windowTime, timeUnit))
                .apply(creator.create(), windowAllFold, windowAllMerge);

The problem is that the triggers FIRE correctly but no partial results
(every 2 seconds) are emitted, only the final result (every 10 seconds) is
emitted.

Even if instead of returning FIRE on onElement, I do:

ctx.registerEventTimeTimer(timestamp);

and return FIRE or FIRE_AND_PURGE on onEvent it still doesn't emit the
partial values.

There's a commented line on the job that registers a PartialWindowTrigger
for the windowAll window but still doesn't work if uncommented.

I added println's on the trigger and on the job steps, this is the output:

2016-11-03T11:07:04.180+01:00 onElement FIRE
2016-11-03T11:07:04.232+01:00 multiCountWindowFn        1
2016-11-03T11:07:04.305+01:00 windowAllFold
2016-11-03T11:07:04.733+01:00 timeWindowFold
2016-11-03T11:07:04.681+01:00 onElement CONTINUE
2016-11-03T11:07:05.234+01:00 timeWindowFold
2016-11-03T11:07:05.182+01:00 onElement CONTINUE
2016-11-03T11:07:05.735+01:00 timeWindowFold
2016-11-03T11:07:05.682+01:00 onElement CONTINUE
2016-11-03T11:07:06.236+01:00 timeWindowFold
2016-11-03T11:07:06.183+01:00 onElement FIRE
<3 more blocks like the one above here>

2016-11-03T11:07:12.246+01:00 multiCountWindowFn        1
2016-11-03T11:07:12.317+01:00 windowAllFold
2016-11-03T11:07:12.746+01:00 timeWindowFold
2016-11-03T11:07:12.693+01:00 onElement CONTINUE
2016-11-03T11:07:13.247+01:00 timeWindowFold
2016-11-03T11:07:13.194+01:00 onElement CONTINUE
2016-11-03T11:07:13.748+01:00 timeWindowFold
2016-11-03T11:07:13.695+01:00 onElement CONTINUE
2016-11-03T11:07:09.999+01:00 onEventTime FIRE_AND_PURGE
2016-11-03T11:07:13.948+01:00 multiCountWindowFn        1
2016-11-03T11:07:14.020+01:00 windowAllFold
2016-11-03T11:07:14.020+01:00 allWindowMerger   1

{"blue":{"foo":{"v":65}},"$":{"ts":1478167634020}}



> Regarding the second question:
> Time windows are by default aligned to 1970-01-01-00:00:00.
> So a 24 hour window will always start at 00:00.
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
> [2] https://www.mapr.com/blog/essential-guide-streaming-
> first-processing-apache-flink
>
> 2016-10-27 16:31 GMT+02:00 Luis Mariano Guerra <ma...@event-fabric.com>:
>
>> hi,
>>
>>  I need to calculate some counts for the day but also emit the partial
>> counts periodically, I think triggers may help me, I'm searching around but
>> there's not much content about it, any tip?
>>
>> for example I'm counting access by location to different services, I want
>> to accumulate access during the whole day, but I want to emit the partial
>> count every 5 minutes.
>>
>> one slightly related question, is there a way to align a window to a day?
>> for example a 24 hour window that starts at 00:00.
>>
>> thanks.
>>
>
>

Re: emit partial state in window (streaming)

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Luis,

these blogposts should help you with the periodic partial result trigger
[1] [2].

Regarding the second question:
Time windows are by default aligned to 1970-01-01-00:00:00.
So a 24 hour window will always start at 00:00.

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
[2]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink

2016-10-27 16:31 GMT+02:00 Luis Mariano Guerra <ma...@event-fabric.com>:

> hi,
>
>  I need to calculate some counts for the day but also emit the partial
> counts periodically, I think triggers may help me, I'm searching around but
> there's not much content about it, any tip?
>
> for example I'm counting access by location to different services, I want
> to accumulate access during the whole day, but I want to emit the partial
> count every 5 minutes.
>
> one slightly related question, is there a way to align a window to a day?
> for example a 24 hour window that starts at 00:00.
>
> thanks.
>