You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2016/09/01 09:42:31 UTC

Re: CountTrigger FIRE or FIRE_AND_PURGE

Hi Paul,

sorry for the delayed reply.

I think a CountTrigger won't give you the expected result. When you call
trigger() you replace! the existing trigger. In case of a
Sliding/TumblingEventTimeWindow, the trigger that fires at the end of the
window is replaced by a trigger that fires every 10 element. So your window
function will not be called after 24h.
You need to implement a custom trigger, similar to the one in the blog
post. I think you only need to modify the example code such that it does
not sum an attribute (passengerCount) but rather counts how often
onElement() has been invoked.

Regarding the CountTrigger that fires several times per element on the
Sliding and only once on Tumbling windows. In case of a tumbling window,
each record is inserted into exactly one window. In case of sliding
windows, each element is inserted into multiple windows (should be six for
a SlidingWindow(1 minute, 10 seconds)). The CountTrigger fires each window
individually.

When using a time window, the WindowFunction has two parameters that
identify the window: 1) the key and 2) the Window object. In case of a time
window, the Window object is a TimeWindow that provides start and end
timestamps.

Another point to consider (esp. for long-running windows as in your case)
is incremental aggregation of window elements [1]. By providing a
FoldFunction (or ReduceFunction), the function is applied for each arriving
element to eagerly aggregate the elements. This means, that the window only
holds the aggregated value and not each individual window element. Hence,
the memory / storage food print is much better and the window aggregate
does not have to be computed for each early firing. When an element arrives
1) the element is aggregated with the incremental aggregation function 2)
the trigger is evaluated 3) if the trigger fires, the window function is
called.

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#windowfunction-with-incremental-aggregation


2016-08-30 17:59 GMT+02:00 Paul Joireman <pa...@physiq.com>:

> Fabian,
>
>
> Thanks for the reference, I think I was incorrectly interpreting the
> results I was getting using the CountTrigger, it looks like it does keep
> the data.   However, I'm running into some unexpected (at least by me)
> behavior.   Given a keyed data stream keyedStream and event timing
>
>
>     final DataStream<MyMessageOut> alertingMsgs = keyedStream
>                 .window(SlidingEventTimeWindows.of(Time.minutes(1),
> Time.seconds(10)))
>                 .trigger(CountTrigger.of(1))
>                 .apply(new MyWindowProcessor());
>
> Every time a new element comes in I expected (probably naeively) one
> firing of the window but I get 5, presumably due to the sliding windows,
> although this probably depends on the Timestamp extraction "policy", I used
> a BoundedOutOfOrdernessTimestampExtractor(Time.minute(1)).    Is there a
> way in the window processing function to determine which particular sliding
> window you are processing?
>
> Alternatively, a TumblingEventTimeWindow as below only fires once, but
> with the default trigger replaced by CountTrigger, my understanding is that
> the previous windows will not purge, is that correct?
>
>     final DataStream<MyMessageOut> alertingMsgs = keyedStream
>                 .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>                 .trigger(CountTrigger.of(1))
>                 .apply(new MyWindowProcessor());
>
> Paul
>
> ------------------------------
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Monday, August 29, 2016 5:46:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: CountTrigger FIRE or FIRE_AND_PURGE
>
> Hi Paul,
>
> This blog post [1] includes an example of an early trigger that should
> pretty much do what you are looking for.
> This one [2] explains the windowing mechanics of Flink (window assigner,
> trigger, function, etc).
>
> Hope this helps,
> Fabian
>
> [1] https://www.mapr.com/blog/essential-guide-streaming-
> first-processing-apache-flink
> [2] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
>
> 2016-08-30 0:25 GMT+02:00 Paul Joireman <pa...@physiq.com>:
>
>> Hi all,
>>
>>
>> I'm attempting to use long SlidingEventTime window (duration 24 hours)
>> but I would like updates more frequently than the 24 hour length.  I
>> naeively attempted to use a simple CountTrigger(10) to give me the window
>> every time 10 samples are collected, however, the window processing
>> function I'm using only seems to get the latest 10 not the whole window
>> (which I what I was hoping for).   The code looks like it simply fires
>> after the count is reached but it seems like it is doing a FIRE and PURGE,
>> I cant' seem to use the iterator in the window processing function to get
>> more than 10 elements at a time.  Is there something I'm missing in order
>> to get at the full content of the window data.
>>
>>
>> Paul
>>
>
>