You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jan Brusch <ja...@neuland-bfi.de> on 2021/02/06 15:43:41 UTC

Sliding Window Count: Tricky Edge Case / Count Zero Problem

Hi,
I was recently working on a problem where we wanted to implement a 
simple count on a sliding window, e.g. "how many messages of a certain 
type were emitted by a certain type of sensor in the last n minutes". 
Which sounds simple enough in theory:

messageStream
     .keyBy(//EmitterType + MessageType)
     .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), 
Time.seconds(30)))
     .map(_ => 1)
     .reduce((x,y) => x + y)
     .addSink(...)

But there is a tricky edge case: The downstream systems will never know 
when the count for a certain key goes back to 0, which is important for 
our use case. The technical reason being that flink doesn't open a 
window if there are no entries, i.e. a window with count 0 doesn't exist 
in flink.

We came up with the following solution for the time being:

messageStream
     .keyBy(//EmitterType + MessageType)
     .window(GlobalWindows.create())
     .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
     .evictor(// CustomEvictor: Evict all messages older than n minutes 
BEFORE processing the window)
     .process(// CustomCounter: Count all Messages in Window State);
     .addSink(...)

In the case of zero messages in the last n minutes, all messages will be 
evicted from the window and the process-function will get triggered one 
last time on the now empty window, so we can produce a count of 0.

I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom 
process functions will always keep all messages in state. And, on every 
trigger all elements will have to be touched twice: To compare the 
timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.

So, I was wondering if there was a more efficient or "flink-like" 
approach to this. Sorry for the long writeup, but I would love to hear 
your takes.


Best regards
Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Jan Brusch <ja...@neuland-bfi.de>.
Hi Roman,

thanks for your reply.

Don't timers remove themselves after firing?

Apart from that, the idea is indeed to have one timer per element, so 
that we count one up whenever the element comes in and count one down 
exactly <windowsize> later. So we emulate a sliding window without the 
"hops" in certain intervals. Instead, we always have a real-time running 
count of elements in the last <windowsize>. But yes, the price for that 
is to have one timer per element. Which is manageable for our use case 
(large windowsize, a LOT of sensors but relatively few elements per 
sensor). In fact, for our use case this solution is much more efficient 
than a sliding window.


Best regards

Jan


On 02.03.21 20:40, Roman Khachatryan wrote:
> Hi Jan,
>
> Thanks for sharing your solution.
> You probably also want to remove previously created timer(s) in 
> processElement; so that you don't end up with a timer per element.
> For that, you can store the previous time (in function state).
>
> Regards,
> Roman
>
>
> On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch <jan.brusch@neuland-bfi.de 
> <ma...@neuland-bfi.de>> wrote:
>
>     Hi everybody,
>
>     I just wanted to say thanks again for all your input and share the
>     (surprisingly simple) solution that we came up with in the meantime:
>
>     class SensorRecordCounter extends KeyedProcessFunction<String,
>     SensorRecord, SensorCount>{
>
>     private ValueState<SensorCount> state;
>     private long windowSizeMs = 60000L;
>
>      @Override
>       public void open(Configuration parameters) throws Exception {
>             state = getRuntimeContext().getState(new
>     ValueStateDescriptor<>("sensorCount", SensorCount.class));
>       }
>
>
>     @Override
>     public void processElement(SensorRecord sensorRecord, Context ctx,
>     Collector<SensorCount> out) throws Exception {
>             SensorCount count = state.value();
>             if (count == null) {
>                 count = new SensorCount();
>                 count.setSensorID(sensorRecord.getSensorID());
>                 count.setCount(0);
>             }
>             count.increase();
>             state.update(count);
>             out.collect(count);
>
>     ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
>     windowSizeMs);
>     }
>
>     @Override
>     public void onTimer(long timestamp, OnTimerContext ctx,
>     Collector<SensorCount> out) throws Exception {
>             SensorCount count = state.value();
>             count.decrease();
>             state.update(count);
>             out.collect(count);
>
>             if (count.getCount() <= 0) {
>                 state.clear();
>             }
>     }
>
>     }
>
>
>     Best regards and a nice weekend
>
>     Jan
>
>
>     On 09.02.21 08:28, Arvid Heise wrote:
>>     Hi Jan,
>>
>>     Another solution is to insert Heartbeat-events at the source for
>>     each sensor. The solution is very similar to how to advance
>>     watermarks when there are no elements in the respective source
>>     partition.
>>
>>     However, it's only easy to implement if you have your own source
>>     and know all sensors on application start. It might also be
>>     possible to implement if you use a new Source interface.
>>
>>     On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yungao.gy@aliyun.com
>>     <ma...@aliyun.com>> wrote:
>>
>>
>>         Hi,
>>
>>         I also think there should be different ways to achieve the
>>         target. For the first option listed previously,
>>         the pseudo-code roughly like
>>
>>         class MyFunciton extends KeyedProcessFunction {
>>             ValueState<Integer> count;
>>
>>             void open() {
>>                count = ... // Create the value state
>>            }
>>
>>             ​void processElement(T t, Context context, Collector
>>         collector) {
>>                     ​Integer current = count.get();
>>                     if (current == null) {
>>         context.timeService().registerTimer(30); // Register timer
>>         for the first time
>>                               current = 0;
>>                     }
>>
>>                     count.update(current + 1); // update the count
>>             }
>>
>>             void onTimer(...) {
>>                  collector.collect(new Tuple2<>(getCurrentKey(),
>>         count.get());
>>         context.timeService().registerTimer(30);  // register the
>>         following timer
>>             }
>>         }
>>
>>         1. For flink the state and timer are all bound to a key
>>         implicitly, thus I think they should
>>         not need to be bound manually.
>>         2. To clear the outdated state, it could be cleared via
>>         count.clear(); if it has been 0
>>         for a long time. There are different ways to count the
>>         interval, like register another timer
>>         and clear the timer when received the elements or update the
>>         counter to -1, -2... to mark
>>         how much timer it has passed.
>>
>>
>>         Best,
>>          Yun
>>
>>
>>
>>
>>             ------------------Original Mail ------------------
>>             *Sender:*Khachatryan Roman <khachatryan.roman@gmail.com
>>             <ma...@gmail.com>>
>>             *Send Date:*Tue Feb 9 02:35:20 2021
>>             *Recipients:*Jan Brusch <jan.brusch@neuland-bfi.de
>>             <ma...@neuland-bfi.de>>
>>             *CC:*Yun Gao <yungao.gy@aliyun.com
>>             <ma...@aliyun.com>>, user
>>             <user@flink.apache.org <ma...@flink.apache.org>>
>>             *Subject:*Re: Sliding Window Count: Tricky Edge Case /
>>             Count Zero Problem
>>
>>                 Hi,
>>
>>                 Probably another solution would be to register a
>>                 timer (using KeyedProcessFunction) once we see an
>>                 element after keyBy. The timer will fire in
>>                 windowIntervalMs. Upon firing, it will emit a dummy
>>                 element which will be ignored (or subtracted) in the end.
>>                 Upon receiving each new element, the function will
>>                 shift the timer accordingly.
>>
>>                 Regards,
>>                 Roman
>>
>>
>>                 On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch
>>                 <jan.brusch@neuland-bfi.de
>>                 <ma...@neuland-bfi.de>> wrote:
>>
>>                     Hi Yun,
>>
>>                     thanks for your reply.
>>
>>                     I do agree with your point about standard windows
>>                     being for high level operations and the
>>                     lower-level apis offering a rich toolset for most
>>                     advanced use cases.
>>
>>                     I have tried to solve my problem with
>>                     keyedProcessFunctions also but was not able to
>>                     get it to work for two reasons:
>>
>>                     1) I was not able to set up a combination of
>>                     ValueState, Timers and Triggers that emulated a
>>                     sliding window with a rising and falling count
>>                     (including 0) good enough.
>>
>>                     2) Memory Leak: States / Windows should be
>>                     cleared after a certain time of being at count 0
>>                     in order to prevent an infinitely rising of
>>                     ValueStates (that are not needed anymore)
>>
>>
>>                     Can you maybe please elaborate in pseudocode how
>>                     you would envision your solution?
>>
>>
>>                     Best regards
>>
>>                     Jan
>>
>>                     On 08.02.21 05:31, Yun Gao wrote:
>>
>>                         Hi Jan,
>>
>>                         From my view, I think in Flink Window should
>>                         be as a "high-level" operation for some kind
>>                         of aggregation operation and if it could not
>>                         satisfy the requirements, we could at least
>>                         turn to
>>                         using the "low-level" api by using
>>                         KeyedProcessFunction[1].
>>
>>                         In this case, we could use a ValueState to
>>                         store the current value for each key, and
>>                         increment
>>                         the value on each element. Then we could also
>>                         register time for each key on receiving the
>>                         first
>>                         element for this key,  and in the onTimer
>>                         callback, we could send the current state
>>                         value, update
>>                         the value to 0 and register another timer for
>>                         this key after 30s.
>>
>>                         Best,
>>                          Yun
>>
>>
>>
>>                         [1]
>>                         https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>                         <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction>
>>
>>                             ------------------Original Mail
>>                             ------------------
>>                             *Sender:*Jan Brusch
>>                             <ja...@neuland-bfi.de>
>>                             <ma...@neuland-bfi.de>
>>                             *Send Date:*Sat Feb 6 23:44:00 2021
>>                             *Recipients:*user <us...@flink.apache.org>
>>                             <ma...@flink.apache.org>
>>                             *Subject:*Sliding Window Count: Tricky
>>                             Edge Case / Count Zero Problem
>>
>>                                 Hi,
>>                                 I was recently working on a problem where we wanted to implement a
>>
>>                                 simple count on a sliding window, e.g. "how many messages of a certain
>>
>>                                 type were emitted by a certain type of sensor in the last n minutes".
>>
>>                                 Which sounds simple enough in theory:
>>
>>                                 messageStream
>>                                      .keyBy(//EmitterType + MessageType)
>>                                      .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>>
>>                                 Time.seconds(30)))
>>                                      .map(_ => 1)
>>                                      .reduce((x,y) => x + y)
>>                                      .addSink(...)
>>
>>                                 But there is a tricky edge case: The downstream systems will never know
>>
>>                                 when the count for a certain key goes back to 0, which is important for
>>
>>                                 our use case. The technical reason being that flink doesn't open a
>>
>>                                 window if there are no entries, i.e. a window with count 0 doesn't exist
>>
>>                                 in flink.
>>
>>                                 We came up with the following solution for the time being:
>>
>>                                 messageStream
>>                                      .keyBy(//EmitterType + MessageType)
>>                                      .window(GlobalWindows.create())
>>                                      .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>>                                      .evictor(// CustomEvictor: Evict all messages older than n minutes
>>
>>                                 BEFORE processing the window)
>>                                      .process(// CustomCounter: Count all Messages in Window State);
>>                                      .addSink(...)
>>
>>                                 In the case of zero messages in the last n minutes, all messages will be
>>
>>                                 evicted from the window and the process-function will get triggered one
>>
>>                                 last time on the now empty window, so we can produce a count of 0.
>>
>>                                 I have two problems, though, with this solution:
>>                                 1) It is computationally inefficient for a simple count, as custom
>>
>>                                 process functions will always keep all messages in state. And, on every
>>
>>                                 trigger all elements will have to be touched twice: To compare the
>>
>>                                 timestamp and to count.
>>                                 2) It does seem like a very roundabout solution to a simple problem.
>>
>>                                 So, I was wondering if there was a more efficient or "flink-like"
>>
>>                                 approach to this. Sorry for the long writeup, but I would love to hear
>>
>>                                 your takes.
>>
>>
>>                                 Best regards
>>                                 Jan
>>
>>                                 -- 
>>                                 neuland  – Büro für Informatik GmbH
>>                                 Konsul-Smidt-Str. 8g, 28217 Bremen
>>
>>                                 Telefon (0421) 380107 57
>>                                 Fax (0421) 380107 99
>>                                 https://www.neuland-bfi.de
>>                                 <https://www.neuland-bfi.de>
>>
>>                                 https://twitter.com/neuland
>>                                 <https://twitter.com/neuland>
>>                                 https://facebook.com/neulandbfi
>>                                 <https://facebook.com/neulandbfi>
>>                                 https://xing.com/company/neulandbfi
>>                                 <https://xing.com/company/neulandbfi>
>>
>>
>>                                 Geschäftsführer: Thomas Gebauer, Jan Zander
>>                                 Registergericht: Amtsgericht Bremen, HRB 23395 HB
>>                                 USt-ID. DE 246585501
>>
>>                     -- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon (0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.de  <https://www.neuland-bfi.de>https://twitter.com/neuland  <https://twitter.com/neuland>https://facebook.com/neulandbfi  <https://facebook.com/neulandbfi>https://xing.com/company/neulandbfi  <https://xing.com/company/neulandbfi>Geschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501
>>
>     -- 
>     neuland  – Büro für Informatik GmbH
>     Konsul-Smidt-Str. 8g, 28217 Bremen
>
>     Telefon (0421) 380107 57
>     Fax (0421) 380107 99
>     https://www.neuland-bfi.de  <https://www.neuland-bfi.de>
>
>     https://twitter.com/neuland  <https://twitter.com/neuland>
>     https://facebook.com/neulandbfi  <https://facebook.com/neulandbfi>
>     https://xing.com/company/neulandbfi  <https://xing.com/company/neulandbfi>
>
>
>     Geschäftsführer: Thomas Gebauer, Jan Zander
>     Registergericht: Amtsgericht Bremen, HRB 23395 HB
>     USt-ID. DE 246585501
>
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Jan,

Thanks for sharing your solution.
You probably also want to remove previously created timer(s) in
processElement; so that you don't end up with a timer per element.
For that, you can store the previous time (in function state).

Regards,
Roman


On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch <ja...@neuland-bfi.de>
wrote:

> Hi everybody,
>
> I just wanted to say thanks again for all your input and share the
> (surprisingly simple) solution that we came up with in the meantime:
>
> class SensorRecordCounter extends KeyedProcessFunction<String,
> SensorRecord, SensorCount>{
>
> private ValueState<SensorCount> state;
> private long windowSizeMs = 60000L;
>
>  @Override
>   public void open(Configuration parameters) throws Exception {
>         state = getRuntimeContext().getState(new
> ValueStateDescriptor<>("sensorCount", SensorCount.class));
>   }
>
>
> @Override
> public void processElement(SensorRecord sensorRecord, Context ctx,
> Collector<SensorCount> out) throws Exception {
>         SensorCount count = state.value();
>         if (count == null) {
>             count = new SensorCount();
>             count.setSensorID(sensorRecord.getSensorID());
>             count.setCount(0);
>         }
>         count.increase();
>         state.update(count);
>         out.collect(count);
>
>         ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> windowSizeMs);
> }
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<SensorCount> out) throws Exception {
>         SensorCount count = state.value();
>         count.decrease();
>         state.update(count);
>         out.collect(count);
>
>         if (count.getCount() <= 0) {
>             state.clear();
>         }
> }
>
> }
>
>
> Best regards and a nice weekend
>
> Jan
>
>
> On 09.02.21 08:28, Arvid Heise wrote:
>
> Hi Jan,
>
> Another solution is to insert Heartbeat-events at the source for each
> sensor. The solution is very similar to how to advance watermarks when
> there are no elements in the respective source partition.
>
> However, it's only easy to implement if you have your own source and know
> all sensors on application start. It might also be possible to implement if
> you use a new Source interface.
>
> On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yu...@aliyun.com> wrote:
>
>>
>> Hi,
>>
>> I also think there should be different ways to achieve the target. For
>> the first option listed previously,
>> the pseudo-code roughly like
>>
>> class MyFunciton extends KeyedProcessFunction {
>>     ValueState<Integer> count;
>>
>>     void open() {
>>        count = ... // Create the value state
>>    }
>>
>>     ​void processElement(T t, Context context, Collector collector) {
>>             ​Integer current = count.get();
>>             if (current == null) {
>>                       context.timeService().registerTimer(30); //
>> Register timer for the first time
>>                       current = 0;
>>             }
>>
>>             count.update(current + 1); // update the count
>>     }
>>
>>     void onTimer(...) {
>>          collector.collect(new Tuple2<>(getCurrentKey(), count.get());
>>           context.timeService().registerTimer(30);  // register the
>> following timer
>>     }
>> }
>>
>> 1. For flink the state and timer are all bound to a key implicitly, thus
>> I think they should
>> not need to be bound manually.
>> 2. To clear the outdated state, it could be cleared via count.clear(); if
>> it has been 0
>> for a long time. There are different ways to count the interval, like
>> register another timer
>> and clear the timer when received the elements or update the counter to
>> -1, -2... to mark
>> how much timer it has passed.
>>
>>
>> Best,
>>  Yun
>>
>>
>>
>>
>> ------------------Original Mail ------------------
>> *Sender:*Khachatryan Roman <kh...@gmail.com>
>> *Send Date:*Tue Feb 9 02:35:20 2021
>> *Recipients:*Jan Brusch <ja...@neuland-bfi.de>
>> *CC:*Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
>> *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem
>>
>>> Hi,
>>>
>>> Probably another solution would be to register a timer
>>> (using KeyedProcessFunction) once we see an element after keyBy. The timer
>>> will fire in windowIntervalMs. Upon firing, it will emit a dummy element
>>> which will be ignored (or subtracted) in the end.
>>> Upon receiving each new element, the function will shift the timer
>>> accordingly.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch <ja...@neuland-bfi.de>
>>> wrote:
>>>
>>>> Hi Yun,
>>>>
>>>> thanks for your reply.
>>>>
>>>> I do agree with your point about standard windows being for high level
>>>> operations and the lower-level apis offering a rich toolset for most
>>>> advanced use cases.
>>>>
>>>> I have tried to solve my problem with keyedProcessFunctions also but
>>>> was not able to get it to work for two reasons:
>>>>
>>>> 1) I was not able to set up a combination of ValueState, Timers and
>>>> Triggers that emulated a sliding window with a rising and falling count
>>>> (including 0) good enough.
>>>>
>>>> 2) Memory Leak: States / Windows should be cleared after a certain time
>>>> of being at count 0 in order to prevent an infinitely rising of ValueStates
>>>> (that are not needed anymore)
>>>>
>>>>
>>>> Can you maybe please elaborate in pseudocode how you would envision
>>>> your solution?
>>>>
>>>>
>>>> Best regards
>>>>
>>>> Jan
>>>> On 08.02.21 05:31, Yun Gao wrote:
>>>>
>>>> Hi Jan,
>>>>
>>>> From my view, I think in Flink Window should be as a "high-level"
>>>> operation for some kind
>>>> of aggregation operation and if it could not satisfy the requirements,
>>>> we could at least turn to
>>>> using the "low-level" api by using KeyedProcessFunction[1].
>>>>
>>>> In this case, we could use a ValueState to store the current value for
>>>> each key, and increment
>>>> the value on each element. Then we could also register time for each
>>>> key on receiving the first
>>>> element for this key,  and in the onTimer callback, we could send the
>>>> current state value, update
>>>> the value to 0 and register another timer for this key after 30s.
>>>>
>>>> Best,
>>>>  Yun
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>>>
>>>> ------------------Original Mail ------------------
>>>> *Sender:*Jan Brusch <ja...@neuland-bfi.de>
>>>> <ja...@neuland-bfi.de>
>>>> *Send Date:*Sat Feb 6 23:44:00 2021
>>>> *Recipients:*user <us...@flink.apache.org> <us...@flink.apache.org>
>>>> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>>>>
>>>>> Hi,
>>>>> I was recently working on a problem where we wanted to implement a
>>>>> simple count on a sliding window, e.g. "how many messages of a certain
>>>>> type were emitted by a certain type of sensor in the last n minutes".
>>>>> Which sounds simple enough in theory:
>>>>>
>>>>> messageStream
>>>>>      .keyBy(//EmitterType + MessageType)
>>>>>      .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>>>>> Time.seconds(30)))
>>>>>      .map(_ => 1)
>>>>>      .reduce((x,y) => x + y)
>>>>>      .addSink(...)
>>>>>
>>>>>
>>>>> But there is a tricky edge case: The downstream systems will never know
>>>>>
>>>>> when the count for a certain key goes back to 0, which is important for
>>>>> our use case. The technical reason being that flink doesn't open a
>>>>>
>>>>> window if there are no entries, i.e. a window with count 0 doesn't exist
>>>>> in flink.
>>>>>
>>>>> We came up with the following solution for the time being:
>>>>>
>>>>> messageStream
>>>>>      .keyBy(//EmitterType + MessageType)
>>>>>      .window(GlobalWindows.create())
>>>>>      .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>>>>>
>>>>>      .evictor(// CustomEvictor: Evict all messages older than n minutes
>>>>> BEFORE processing the window)
>>>>>      .process(// CustomCounter: Count all Messages in Window State);
>>>>>      .addSink(...)
>>>>>
>>>>>
>>>>> In the case of zero messages in the last n minutes, all messages will be
>>>>>
>>>>> evicted from the window and the process-function will get triggered one
>>>>> last time on the now empty window, so we can produce a count of 0.
>>>>>
>>>>> I have two problems, though, with this solution:
>>>>> 1) It is computationally inefficient for a simple count, as custom
>>>>>
>>>>> process functions will always keep all messages in state. And, on every
>>>>> trigger all elements will have to be touched twice: To compare the
>>>>> timestamp and to count.
>>>>> 2) It does seem like a very roundabout solution to a simple problem.
>>>>>
>>>>> So, I was wondering if there was a more efficient or "flink-like"
>>>>> approach to this. Sorry for the long writeup, but I would love to hear
>>>>> your takes.
>>>>>
>>>>>
>>>>> Best regards
>>>>> Jan
>>>>>
>>>>> --
>>>>> neuland  – Büro für Informatik GmbH
>>>>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>>>>
>>>>> Telefon (0421) 380107 57
>>>>> Fax (0421) 380107 99
>>>>> https://www.neuland-bfi.de
>>>>>
>>>>> https://twitter.com/neuland
>>>>> https://facebook.com/neulandbfi
>>>>> https://xing.com/company/neulandbfi
>>>>>
>>>>>
>>>>> Geschäftsführer: Thomas Gebauer, Jan Zander
>>>>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>>>>> USt-ID. DE 246585501
>>>>
>>>> -- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon (0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.dehttps://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfiGeschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501
>>>>
>>>> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Jan Brusch <ja...@neuland-bfi.de>.
Hi everybody,

I just wanted to say thanks again for all your input and share the 
(surprisingly simple) solution that we came up with in the meantime:

class SensorRecordCounter extends KeyedProcessFunction<String, 
SensorRecord, SensorCount>{

private ValueState<SensorCount> state;
private long windowSizeMs = 60000L;

  @Override
   public void open(Configuration parameters) throws Exception {
         state = getRuntimeContext().getState(new 
ValueStateDescriptor<>("sensorCount", SensorCount.class));
   }


@Override
public void processElement(SensorRecord sensorRecord, Context ctx, 
Collector<SensorCount> out) throws Exception {
         SensorCount count = state.value();
         if (count == null) {
             count = new SensorCount();
             count.setSensorID(sensorRecord.getSensorID());
             count.setCount(0);
         }
         count.increase();
         state.update(count);
         out.collect(count);

         ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
windowSizeMs);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<SensorCount> out) throws Exception {
         SensorCount count = state.value();
         count.decrease();
         state.update(count);
         out.collect(count);

         if (count.getCount() <= 0) {
             state.clear();
         }
}

}


Best regards and a nice weekend

Jan


On 09.02.21 08:28, Arvid Heise wrote:
> Hi Jan,
>
> Another solution is to insert Heartbeat-events at the source for each 
> sensor. The solution is very similar to how to advance watermarks when 
> there are no elements in the respective source partition.
>
> However, it's only easy to implement if you have your own source and 
> know all sensors on application start. It might also be possible to 
> implement if you use a new Source interface.
>
> On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yungao.gy@aliyun.com 
> <ma...@aliyun.com>> wrote:
>
>
>     Hi,
>
>     I also think there should be different ways to achieve the target.
>     For the first option listed previously,
>     the pseudo-code roughly like
>
>     class MyFunciton extends KeyedProcessFunction {
>         ValueState<Integer> count;
>
>         void open() {
>            count = ... // Create the value state
>        }
>
>         ​void processElement(T t, Context context, Collector collector) {
>                 ​Integer current = count.get();
>                 if (current == null) {
>     context.timeService().registerTimer(30); // Register timer for the
>     first time
>                           current = 0;
>                 }
>
>                 count.update(current + 1); // update the count
>         }
>
>         void onTimer(...) {
>              collector.collect(new Tuple2<>(getCurrentKey(), count.get());
>     context.timeService().registerTimer(30);  // register the
>     following timer
>         }
>     }
>
>     1. For flink the state and timer are all bound to a key
>     implicitly, thus I think they should
>     not need to be bound manually.
>     2. To clear the outdated state, it could be cleared via
>     count.clear(); if it has been 0
>     for a long time. There are different ways to count the interval,
>     like register another timer
>     and clear the timer when received the elements or update the
>     counter to -1, -2... to mark
>     how much timer it has passed.
>
>
>     Best,
>      Yun
>
>
>
>
>         ------------------Original Mail ------------------
>         *Sender:*Khachatryan Roman <khachatryan.roman@gmail.com
>         <ma...@gmail.com>>
>         *Send Date:*Tue Feb 9 02:35:20 2021
>         *Recipients:*Jan Brusch <jan.brusch@neuland-bfi.de
>         <ma...@neuland-bfi.de>>
>         *CC:*Yun Gao <yungao.gy@aliyun.com
>         <ma...@aliyun.com>>, user <user@flink.apache.org
>         <ma...@flink.apache.org>>
>         *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count
>         Zero Problem
>
>             Hi,
>
>             Probably another solution would be to register a timer
>             (using KeyedProcessFunction) once we see an element after
>             keyBy. The timer will fire in windowIntervalMs. Upon
>             firing, it will emit a dummy element which will be ignored
>             (or subtracted) in the end.
>             Upon receiving each new element, the function will shift
>             the timer accordingly.
>
>             Regards,
>             Roman
>
>
>             On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch
>             <jan.brusch@neuland-bfi.de
>             <ma...@neuland-bfi.de>> wrote:
>
>                 Hi Yun,
>
>                 thanks for your reply.
>
>                 I do agree with your point about standard windows
>                 being for high level operations and the lower-level
>                 apis offering a rich toolset for most advanced use cases.
>
>                 I have tried to solve my problem with
>                 keyedProcessFunctions also but was not able to get it
>                 to work for two reasons:
>
>                 1) I was not able to set up a combination of
>                 ValueState, Timers and Triggers that emulated a
>                 sliding window with a rising and falling count
>                 (including 0) good enough.
>
>                 2) Memory Leak: States / Windows should be cleared
>                 after a certain time of being at count 0 in order to
>                 prevent an infinitely rising of ValueStates (that are
>                 not needed anymore)
>
>
>                 Can you maybe please elaborate in pseudocode how you
>                 would envision your solution?
>
>
>                 Best regards
>
>                 Jan
>
>                 On 08.02.21 05:31, Yun Gao wrote:
>
>                     Hi Jan,
>
>                     From my view, I think in Flink Window should be as
>                     a "high-level" operation for some kind
>                     of aggregation operation and if it could not
>                     satisfy the requirements, we could at least turn to
>                     using the "low-level" api by using
>                     KeyedProcessFunction[1].
>
>                     In this case, we could use a ValueState to store
>                     the current value for each key, and increment
>                     the value on each element. Then we could also
>                     register time for each key on receiving the first
>                     element for this key,  and in the onTimer
>                     callback, we could send the current state value,
>                     update
>                     the value to 0 and register another timer for this
>                     key after 30s.
>
>                     Best,
>                      Yun
>
>
>
>                     [1]
>                     https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>                     <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction>
>
>                         ------------------Original Mail ------------------
>                         *Sender:*Jan Brusch
>                         <ja...@neuland-bfi.de>
>                         <ma...@neuland-bfi.de>
>                         *Send Date:*Sat Feb 6 23:44:00 2021
>                         *Recipients:*user <us...@flink.apache.org>
>                         <ma...@flink.apache.org>
>                         *Subject:*Sliding Window Count: Tricky Edge
>                         Case / Count Zero Problem
>
>                             Hi,
>                             I was recently working on a problem where we wanted to implement a
>
>                             simple count on a sliding window, e.g. "how many messages of a certain
>
>                             type were emitted by a certain type of sensor in the last n minutes".
>
>                             Which sounds simple enough in theory:
>
>                             messageStream
>                                  .keyBy(//EmitterType + MessageType)
>                                  .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>
>                             Time.seconds(30)))
>                                  .map(_ => 1)
>                                  .reduce((x,y) => x + y)
>                                  .addSink(...)
>
>                             But there is a tricky edge case: The downstream systems will never know
>
>                             when the count for a certain key goes back to 0, which is important for
>
>                             our use case. The technical reason being that flink doesn't open a
>
>                             window if there are no entries, i.e. a window with count 0 doesn't exist
>
>                             in flink.
>
>                             We came up with the following solution for the time being:
>
>                             messageStream
>                                  .keyBy(//EmitterType + MessageType)
>                                  .window(GlobalWindows.create())
>                                  .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>                                  .evictor(// CustomEvictor: Evict all messages older than n minutes
>
>                             BEFORE processing the window)
>                                  .process(// CustomCounter: Count all Messages in Window State);
>                                  .addSink(...)
>
>                             In the case of zero messages in the last n minutes, all messages will be
>
>                             evicted from the window and the process-function will get triggered one
>
>                             last time on the now empty window, so we can produce a count of 0.
>
>                             I have two problems, though, with this solution:
>                             1) It is computationally inefficient for a simple count, as custom
>
>                             process functions will always keep all messages in state. And, on every
>
>                             trigger all elements will have to be touched twice: To compare the
>
>                             timestamp and to count.
>                             2) It does seem like a very roundabout solution to a simple problem.
>
>                             So, I was wondering if there was a more efficient or "flink-like"
>
>                             approach to this. Sorry for the long writeup, but I would love to hear
>
>                             your takes.
>
>
>                             Best regards
>                             Jan
>
>                             -- 
>                             neuland  – Büro für Informatik GmbH
>                             Konsul-Smidt-Str. 8g, 28217 Bremen
>
>                             Telefon (0421) 380107 57
>                             Fax (0421) 380107 99
>                             https://www.neuland-bfi.de
>                             <https://www.neuland-bfi.de>
>
>                             https://twitter.com/neuland
>                             <https://twitter.com/neuland>
>                             https://facebook.com/neulandbfi
>                             <https://facebook.com/neulandbfi>
>                             https://xing.com/company/neulandbfi
>                             <https://xing.com/company/neulandbfi>
>
>
>                             Geschäftsführer: Thomas Gebauer, Jan Zander
>                             Registergericht: Amtsgericht Bremen, HRB 23395 HB
>                             USt-ID. DE 246585501
>
>                 -- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon (0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.de  <https://www.neuland-bfi.de>https://twitter.com/neuland  <https://twitter.com/neuland>https://facebook.com/neulandbfi  <https://facebook.com/neulandbfi>https://xing.com/company/neulandbfi  <https://xing.com/company/neulandbfi>Geschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501
>
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Arvid Heise <ar...@apache.org>.
Hi Jan,

Another solution is to insert Heartbeat-events at the source for each
sensor. The solution is very similar to how to advance watermarks when
there are no elements in the respective source partition.

However, it's only easy to implement if you have your own source and know
all sensors on application start. It might also be possible to implement if
you use a new Source interface.

On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yu...@aliyun.com> wrote:

>
> Hi,
>
> I also think there should be different ways to achieve the target. For the
> first option listed previously,
> the pseudo-code roughly like
>
> class MyFunciton extends KeyedProcessFunction {
>     ValueState<Integer> count;
>
>     void open() {
>        count = ... // Create the value state
>    }
>
>     ​void processElement(T t, Context context, Collector collector) {
>             ​Integer current = count.get();
>             if (current == null) {
>                       context.timeService().registerTimer(30); // Register
> timer for the first time
>                       current = 0;
>             }
>
>             count.update(current + 1); // update the count
>     }
>
>     void onTimer(...) {
>          collector.collect(new Tuple2<>(getCurrentKey(), count.get());
>           context.timeService().registerTimer(30);  // register the
> following timer
>     }
> }
>
> 1. For flink the state and timer are all bound to a key implicitly, thus I
> think they should
> not need to be bound manually.
> 2. To clear the outdated state, it could be cleared via count.clear(); if
> it has been 0
> for a long time. There are different ways to count the interval, like
> register another timer
> and clear the timer when received the elements or update the counter to
> -1, -2... to mark
> how much timer it has passed.
>
>
> Best,
>  Yun
>
>
>
>
> ------------------Original Mail ------------------
> *Sender:*Khachatryan Roman <kh...@gmail.com>
> *Send Date:*Tue Feb 9 02:35:20 2021
> *Recipients:*Jan Brusch <ja...@neuland-bfi.de>
> *CC:*Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
> *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem
>
>> Hi,
>>
>> Probably another solution would be to register a timer
>> (using KeyedProcessFunction) once we see an element after keyBy. The timer
>> will fire in windowIntervalMs. Upon firing, it will emit a dummy element
>> which will be ignored (or subtracted) in the end.
>> Upon receiving each new element, the function will shift the timer
>> accordingly.
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch <ja...@neuland-bfi.de>
>> wrote:
>>
>>> Hi Yun,
>>>
>>> thanks for your reply.
>>>
>>> I do agree with your point about standard windows being for high level
>>> operations and the lower-level apis offering a rich toolset for most
>>> advanced use cases.
>>>
>>> I have tried to solve my problem with keyedProcessFunctions also but was
>>> not able to get it to work for two reasons:
>>>
>>> 1) I was not able to set up a combination of ValueState, Timers and
>>> Triggers that emulated a sliding window with a rising and falling count
>>> (including 0) good enough.
>>>
>>> 2) Memory Leak: States / Windows should be cleared after a certain time
>>> of being at count 0 in order to prevent an infinitely rising of ValueStates
>>> (that are not needed anymore)
>>>
>>>
>>> Can you maybe please elaborate in pseudocode how you would envision your
>>> solution?
>>>
>>>
>>> Best regards
>>>
>>> Jan
>>> On 08.02.21 05:31, Yun Gao wrote:
>>>
>>> Hi Jan,
>>>
>>> From my view, I think in Flink Window should be as a "high-level"
>>> operation for some kind
>>> of aggregation operation and if it could not satisfy the requirements,
>>> we could at least turn to
>>> using the "low-level" api by using KeyedProcessFunction[1].
>>>
>>> In this case, we could use a ValueState to store the current value for
>>> each key, and increment
>>> the value on each element. Then we could also register time for each key
>>> on receiving the first
>>> element for this key,  and in the onTimer callback, we could send the
>>> current state value, update
>>> the value to 0 and register another timer for this key after 30s.
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*Jan Brusch <ja...@neuland-bfi.de>
>>> <ja...@neuland-bfi.de>
>>> *Send Date:*Sat Feb 6 23:44:00 2021
>>> *Recipients:*user <us...@flink.apache.org> <us...@flink.apache.org>
>>> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>>>
>>>> Hi,
>>>> I was recently working on a problem where we wanted to implement a
>>>> simple count on a sliding window, e.g. "how many messages of a certain
>>>> type were emitted by a certain type of sensor in the last n minutes".
>>>> Which sounds simple enough in theory:
>>>>
>>>> messageStream
>>>>      .keyBy(//EmitterType + MessageType)
>>>>      .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>>>> Time.seconds(30)))
>>>>      .map(_ => 1)
>>>>      .reduce((x,y) => x + y)
>>>>      .addSink(...)
>>>>
>>>> But there is a tricky edge case: The downstream systems will never know
>>>> when the count for a certain key goes back to 0, which is important for
>>>> our use case. The technical reason being that flink doesn't open a
>>>>
>>>> window if there are no entries, i.e. a window with count 0 doesn't exist
>>>> in flink.
>>>>
>>>> We came up with the following solution for the time being:
>>>>
>>>> messageStream
>>>>      .keyBy(//EmitterType + MessageType)
>>>>      .window(GlobalWindows.create())
>>>>      .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>>>>      .evictor(// CustomEvictor: Evict all messages older than n minutes
>>>> BEFORE processing the window)
>>>>      .process(// CustomCounter: Count all Messages in Window State);
>>>>      .addSink(...)
>>>>
>>>>
>>>> In the case of zero messages in the last n minutes, all messages will be
>>>> evicted from the window and the process-function will get triggered one
>>>> last time on the now empty window, so we can produce a count of 0.
>>>>
>>>> I have two problems, though, with this solution:
>>>> 1) It is computationally inefficient for a simple count, as custom
>>>> process functions will always keep all messages in state. And, on every
>>>> trigger all elements will have to be touched twice: To compare the
>>>> timestamp and to count.
>>>> 2) It does seem like a very roundabout solution to a simple problem.
>>>>
>>>> So, I was wondering if there was a more efficient or "flink-like"
>>>> approach to this. Sorry for the long writeup, but I would love to hear
>>>> your takes.
>>>>
>>>>
>>>> Best regards
>>>> Jan
>>>>
>>>> --
>>>> neuland  – Büro für Informatik GmbH
>>>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>>>
>>>> Telefon (0421) 380107 57
>>>> Fax (0421) 380107 99
>>>> https://www.neuland-bfi.de
>>>>
>>>> https://twitter.com/neuland
>>>> https://facebook.com/neulandbfi
>>>> https://xing.com/company/neulandbfi
>>>>
>>>>
>>>> Geschäftsführer: Thomas Gebauer, Jan Zander
>>>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>>>> USt-ID. DE 246585501
>>>
>>> -- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon (0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.dehttps://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfiGeschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501
>>>
>>>

Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Yun Gao <yu...@aliyun.com>.
Hi,

I also think there should be different ways to achieve the target. For the first option listed previously, 
the pseudo-code roughly like

class MyFunciton extends KeyedProcessFunction {
    ValueState<Integer> count;

    void open() {
       count = ... // Create the value state
   }  

    ​void processElement(T t, Context context, Collector collector) {
            ​Integer current = count.get();
            if (current == null) {
                      context.timeService().registerTimer(30); // Register timer for the first time
                      current = 0;
            }

            count.update(current + 1); // update the count
    }

    void onTimer(...) {
         collector.collect(new Tuple2<>(getCurrentKey(), count.get());
 context.timeService().registerTimer(30);  // register the following timer
    }
}

1. For flink the state and timer are all bound to a key implicitly, thus I think they should
not need to be bound manually.
2. To clear the outdated state, it could be cleared via count.clear(); if it has been 0 
for a long time. There are different ways to count the interval, like register another timer
and clear the timer when received the elements or update the counter to -1, -2... to mark
how much timer it has passed.


Best,
 Yun





 ------------------Original Mail ------------------
Sender:Khachatryan Roman <kh...@gmail.com>
Send Date:Tue Feb 9 02:35:20 2021
Recipients:Jan Brusch <ja...@neuland-bfi.de>
CC:Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
Subject:Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Hi,

Probably another solution would be to register a timer (using KeyedProcessFunction) once we see an element after keyBy. The timer will fire in windowIntervalMs. Upon firing, it will emit a dummy element which will be ignored (or subtracted) in the end.
Upon receiving each new element, the function will shift the timer accordingly.

Regards,
Roman

On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch <ja...@neuland-bfi.de> wrote:

Hi Yun,
thanks for your reply.
I do agree with your point about standard windows being for high level operations and the lower-level apis offering a rich toolset for most advanced use cases.
I have tried to solve my problem with keyedProcessFunctions also but was not able to get it to work for two reasons:
1) I was not able to set up a combination of ValueState, Timers and Triggers that emulated a sliding window with a rising and falling count (including 0) good enough.
2) Memory Leak: States / Windows should be cleared after a certain time of being at count 0 in order to prevent an infinitely rising of ValueStates (that are not needed anymore)

Can you maybe please elaborate in pseudocode how you would envision your solution?

Best regards
Jan
On 08.02.21 05:31, Yun Gao wrote:

Hi Jan,

From my view, I think in Flink Window should be as a "high-level" operation for some kind
of aggregation operation and if it could not satisfy the requirements, we could at least turn to
using the "low-level" api by using KeyedProcessFunction[1].

In this case, we could use a ValueState to store the current value for each key, and increment
the value on each element. Then we could also register time for each key on receiving the first 
element for this key,  and in the onTimer callback, we could send the current state value, update
the value to 0 and register another timer for this key after 30s.

Best,
 Yun



[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction


 ------------------Original Mail ------------------
Sender:Jan Brusch <ja...@neuland-bfi.de>
Send Date:Sat Feb 6 23:44:00 2021
Recipients:user <us...@flink.apache.org>
Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem 
Hi,
I was recently working on a problem where we wanted to implement a 
simple count on a sliding window, e.g. "how many messages of a certain 
type were emitted by a certain type of sensor in the last n minutes". 
 Which sounds simple enough in theory:

 messageStream
      .keyBy(//EmitterType + MessageType)
     .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), 
 Time.seconds(30)))
      .map(_ => 1)
      .reduce((x,y) => x + y)
      .addSink(...)

But there is a tricky edge case: The downstream systems will never know 
when the count for a certain key goes back to 0, which is important for 
our use case. The technical reason being that flink doesn't open a 
window if there are no entries, i.e. a window with count 0 doesn't exist 
 in flink.

 We came up with the following solution for the time being:

 messageStream
      .keyBy(//EmitterType + MessageType)
      .window(GlobalWindows.create())
     .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
     .evictor(// CustomEvictor: Evict all messages older than n minutes 
 BEFORE processing the window)
     .process(// CustomCounter: Count all Messages in Window State);
      .addSink(...)

In the case of zero messages in the last n minutes, all messages will be 
evicted from the window and the process-function will get triggered one 
last time on the now empty window, so we can produce a count of 0.

 I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom 
process functions will always keep all messages in state. And, on every 
trigger all elements will have to be touched twice: To compare the 
 timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.

So, I was wondering if there was a more efficient or "flink-like" 
approach to this. Sorry for the long writeup, but I would love to hear 
 your takes.


 Best regards
 Jan

 -- 
 neuland  – Büro für Informatik GmbH
 Konsul-Smidt-Str. 8g, 28217 Bremen

 Telefon (0421) 380107 57
 Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


 Geschäftsführer: Thomas Gebauer, Jan Zander
 Registergericht: Amtsgericht Bremen, HRB 23395 HB
 USt-ID. DE 246585501    
-- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon (0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.dehttps://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfiGeschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501 

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi,

Probably another solution would be to register a timer
(using KeyedProcessFunction) once we see an element after keyBy. The timer
will fire in windowIntervalMs. Upon firing, it will emit a dummy element
which will be ignored (or subtracted) in the end.
Upon receiving each new element, the function will shift the timer
accordingly.

Regards,
Roman


On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch <ja...@neuland-bfi.de>
wrote:

> Hi Yun,
>
> thanks for your reply.
>
> I do agree with your point about standard windows being for high level
> operations and the lower-level apis offering a rich toolset for most
> advanced use cases.
>
> I have tried to solve my problem with keyedProcessFunctions also but was
> not able to get it to work for two reasons:
>
> 1) I was not able to set up a combination of ValueState, Timers and
> Triggers that emulated a sliding window with a rising and falling count
> (including 0) good enough.
>
> 2) Memory Leak: States / Windows should be cleared after a certain time of
> being at count 0 in order to prevent an infinitely rising of ValueStates
> (that are not needed anymore)
>
>
> Can you maybe please elaborate in pseudocode how you would envision your
> solution?
>
>
> Best regards
>
> Jan
> On 08.02.21 05:31, Yun Gao wrote:
>
> Hi Jan,
>
> From my view, I think in Flink Window should be as a "high-level"
> operation for some kind
> of aggregation operation and if it could not satisfy the requirements, we
> could at least turn to
> using the "low-level" api by using KeyedProcessFunction[1].
>
> In this case, we could use a ValueState to store the current value for
> each key, and increment
> the value on each element. Then we could also register time for each key
> on receiving the first
> element for this key,  and in the onTimer callback, we could send the
> current state value, update
> the value to 0 and register another timer for this key after 30s.
>
> Best,
>  Yun
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
> ------------------Original Mail ------------------
> *Sender:*Jan Brusch <ja...@neuland-bfi.de>
> <ja...@neuland-bfi.de>
> *Send Date:*Sat Feb 6 23:44:00 2021
> *Recipients:*user <us...@flink.apache.org> <us...@flink.apache.org>
> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>
>> Hi,
>> I was recently working on a problem where we wanted to implement a
>> simple count on a sliding window, e.g. "how many messages of a certain
>> type were emitted by a certain type of sensor in the last n minutes".
>> Which sounds simple enough in theory:
>>
>> messageStream
>>      .keyBy(//EmitterType + MessageType)
>>      .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>> Time.seconds(30)))
>>      .map(_ => 1)
>>      .reduce((x,y) => x + y)
>>      .addSink(...)
>>
>> But there is a tricky edge case: The downstream systems will never know
>> when the count for a certain key goes back to 0, which is important for
>> our use case. The technical reason being that flink doesn't open a
>> window if there are no entries, i.e. a window with count 0 doesn't exist
>> in flink.
>>
>> We came up with the following solution for the time being:
>>
>> messageStream
>>      .keyBy(//EmitterType + MessageType)
>>      .window(GlobalWindows.create())
>>      .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>>      .evictor(// CustomEvictor: Evict all messages older than n minutes
>> BEFORE processing the window)
>>      .process(// CustomCounter: Count all Messages in Window State);
>>      .addSink(...)
>>
>> In the case of zero messages in the last n minutes, all messages will be
>> evicted from the window and the process-function will get triggered one
>> last time on the now empty window, so we can produce a count of 0.
>>
>> I have two problems, though, with this solution:
>> 1) It is computationally inefficient for a simple count, as custom
>> process functions will always keep all messages in state. And, on every
>> trigger all elements will have to be touched twice: To compare the
>> timestamp and to count.
>> 2) It does seem like a very roundabout solution to a simple problem.
>>
>> So, I was wondering if there was a more efficient or "flink-like"
>> approach to this. Sorry for the long writeup, but I would love to hear
>> your takes.
>>
>>
>> Best regards
>> Jan
>>
>> --
>> neuland  – Büro für Informatik GmbH
>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>
>> Telefon (0421) 380107 57
>> Fax (0421) 380107 99
>> https://www.neuland-bfi.de
>>
>> https://twitter.com/neuland
>> https://facebook.com/neulandbfi
>> https://xing.com/company/neulandbfi
>>
>>
>> Geschäftsführer: Thomas Gebauer, Jan Zander
>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>> USt-ID. DE 246585501
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Jan Brusch <ja...@neuland-bfi.de>.
Hi Yun,

thanks for your reply.

I do agree with your point about standard windows being for high level 
operations and the lower-level apis offering a rich toolset for most 
advanced use cases.

I have tried to solve my problem with keyedProcessFunctions also but was 
not able to get it to work for two reasons:

1) I was not able to set up a combination of ValueState, Timers and 
Triggers that emulated a sliding window with a rising and falling count 
(including 0) good enough.

2) Memory Leak: States / Windows should be cleared after a certain time 
of being at count 0 in order to prevent an infinitely rising of 
ValueStates (that are not needed anymore)


Can you maybe please elaborate in pseudocode how you would envision your 
solution?


Best regards

Jan

On 08.02.21 05:31, Yun Gao wrote:
> Hi Jan,
>
> From my view, I think in Flink Window should be as a "high-level" 
> operation for some kind
> of aggregation operation and if it could not satisfy the requirements, 
> we could at least turn to
> using the "low-level" api by using KeyedProcessFunction[1].
>
> In this case, we could use a ValueState to store the current value for 
> each key, and increment
> the value on each element. Then we could also register time for each 
> key on receiving the first
> element for this key,  and in the onTimer callback, we could send the 
> current state value, update
> the value to 0 and register another timer for this key after 30s.
>
> Best,
>  Yun
>
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
>     ------------------Original Mail ------------------
>     *Sender:*Jan Brusch <ja...@neuland-bfi.de>
>     *Send Date:*Sat Feb 6 23:44:00 2021
>     *Recipients:*user <us...@flink.apache.org>
>     *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>
>         Hi,
>         I was recently working on a problem where we wanted to implement a
>
>         simple count on a sliding window, e.g. "how many messages of a certain
>
>         type were emitted by a certain type of sensor in the last n minutes".
>
>         Which sounds simple enough in theory:
>
>         messageStream
>              .keyBy(//EmitterType + MessageType)
>              .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>
>         Time.seconds(30)))
>              .map(_ => 1)
>              .reduce((x,y) => x + y)
>              .addSink(...)
>
>         But there is a tricky edge case: The downstream systems will never know
>
>         when the count for a certain key goes back to 0, which is important for
>
>         our use case. The technical reason being that flink doesn't open a
>
>         window if there are no entries, i.e. a window with count 0 doesn't exist
>
>         in flink.
>
>         We came up with the following solution for the time being:
>
>         messageStream
>              .keyBy(//EmitterType + MessageType)
>              .window(GlobalWindows.create())
>              .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>              .evictor(// CustomEvictor: Evict all messages older than n minutes
>
>         BEFORE processing the window)
>              .process(// CustomCounter: Count all Messages in Window State);
>              .addSink(...)
>
>         In the case of zero messages in the last n minutes, all messages will be
>
>         evicted from the window and the process-function will get triggered one
>
>         last time on the now empty window, so we can produce a count of 0.
>
>         I have two problems, though, with this solution:
>         1) It is computationally inefficient for a simple count, as custom
>
>         process functions will always keep all messages in state. And, on every
>
>         trigger all elements will have to be touched twice: To compare the
>
>         timestamp and to count.
>         2) It does seem like a very roundabout solution to a simple problem.
>
>         So, I was wondering if there was a more efficient or "flink-like"
>         approach to this. Sorry for the long writeup, but I would love to hear
>
>         your takes.
>
>
>         Best regards
>         Jan
>
>         -- 
>         neuland  – Büro für Informatik GmbH
>         Konsul-Smidt-Str. 8g, 28217 Bremen
>
>         Telefon (0421) 380107 57
>         Fax (0421) 380107 99
>         https://www.neuland-bfi.de
>
>         https://twitter.com/neuland
>         https://facebook.com/neulandbfi
>         https://xing.com/company/neulandbfi
>
>
>         Geschäftsführer: Thomas Gebauer, Jan Zander
>         Registergericht: Amtsgericht Bremen, HRB 23395 HB
>         USt-ID. DE 246585501
>
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Posted by Yun Gao <yu...@aliyun.com>.
Hi Jan,

From my view, I think in Flink Window should be as a "high-level" operation for some kind
of aggregation operation and if it could not satisfy the requirements, we could at least turn to
using the "low-level" api by using KeyedProcessFunction[1].

In this case, we could use a ValueState to store the current value for each key, and increment
the value on each element. Then we could also register time for each key on receiving the first 
element for this key,  and in the onTimer callback, we could send the current state value, update
the value to 0 and register another timer for this key after 30s.

Best,
 Yun



[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction


 ------------------Original Mail ------------------
Sender:Jan Brusch <ja...@neuland-bfi.de>
Send Date:Sat Feb 6 23:44:00 2021
Recipients:user <us...@flink.apache.org>
Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem
Hi,
I was recently working on a problem where we wanted to implement a 
simple count on a sliding window, e.g. "how many messages of a certain 
type were emitted by a certain type of sensor in the last n minutes". 
Which sounds simple enough in theory:

messageStream
     .keyBy(//EmitterType + MessageType)
     .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), 
Time.seconds(30)))
     .map(_ => 1)
     .reduce((x,y) => x + y)
     .addSink(...)

But there is a tricky edge case: The downstream systems will never know 
when the count for a certain key goes back to 0, which is important for 
our use case. The technical reason being that flink doesn't open a 
window if there are no entries, i.e. a window with count 0 doesn't exist 
in flink.

We came up with the following solution for the time being:

messageStream
     .keyBy(//EmitterType + MessageType)
     .window(GlobalWindows.create())
     .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
     .evictor(// CustomEvictor: Evict all messages older than n minutes 
BEFORE processing the window)
     .process(// CustomCounter: Count all Messages in Window State);
     .addSink(...)

In the case of zero messages in the last n minutes, all messages will be 
evicted from the window and the process-function will get triggered one 
last time on the now empty window, so we can produce a count of 0.

I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom 
process functions will always keep all messages in state. And, on every 
trigger all elements will have to be touched twice: To compare the 
timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.

So, I was wondering if there was a more efficient or "flink-like" 
approach to this. Sorry for the long writeup, but I would love to hear 
your takes.


Best regards
Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501