You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/02/17 07:09:29 UTC

Emit message at start and end of event time session window

Hi,
I want to achieve the following using event time session windows:

   1. When the window.getStart() and last event timestamp in the window is
   greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message "Window
   started @ timestamp".
   2. When the session window ends, i.e. the watermark passes
   lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
   ended @ timestamp".

 It is guaranteed that all events are on time and no lateness is allowed. I
am having difficulty implementing both 1 and 2 simultaneously.
I am able to implement point 1 using a custom trigger, which checks if
(lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
customProcessWindowFunction().
However, with this architecture I can't detect the end of the window.

Is my approach correct or is there a completely different method to achieve
this?

Thanks,
Manas Kale

Re: Emit message at start and end of event time session window

Posted by Manas Kale <ma...@gmail.com>.
Hi Till,
Thank you for the explanation, I understand the behaviour now.


On Thu, Mar 26, 2020 at 9:23 PM Till Rohrmann <tr...@apache.org> wrote:

> A quick update concerning your observations. The reason why you are seeing
> the unordered output is because in the gist we used
> a AssignerWithPeriodicWatermarks which generates watermarks periodically.
> Due to this aspect, it can happen that Flink already process all elements
> up to "20" before it sees the next watermark which triggers the processing.
> If there are multiple windows being processed, Flink does not give a
> guarantee in which order this happens.
>
> You can avoid this behaviour if you used
> an AssignerWithPunctuatedWatermarks instead. This watermark assigner is
> called for every record. The updated gist [1] shows how it is used.
>
> [1] https://gist.github.com/tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 4:27 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hmm, I might have given you a bad advice. I think the problem becomes
>> harder because with Flink's window and trigger API we need to keep state
>> consistent between the Trigger and the Window function. Maybe it would be
>> easier to not rely on the windowing mechanism and instead to use Flink's
>> process function [1] to implement the logic yourself.
>>
>> With the process function you have basically a low level API with which
>> you can implement an operator which groups incoming events according to
>> sessions and outputs the required information.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 26, 2020 at 11:27 AM Manas Kale <ma...@gmail.com>
>> wrote:
>>
>>> Hi Till,
>>> I see, thanks for the clarification.
>>> Assuming all other setting are the same, if I generate events as follows
>>> :
>>> Element.from("1", 1000L),
>>>                 Element.from("2", 2000L),
>>>                 Element.from("3", 3000L),
>>>                 Element.from("10", 10000L)
>>>                 ,Element.from("11", 11000L),
>>>                 Element.from("12", 12000L),
>>>                 Element.from("20", 20000L)
>>> we will expect 2 session windows to be created {1,2,3} and {10,11,12}
>>> with appropriate messages. However, when I run this, there seems to be a
>>> problem in the valueState of MyWindowFunction. Apparently that state is
>>> being shared by both the session windows, which leads to incorrect results.
>>> To solve this, I replaced it with a MapState<Long, Boolean>. The Long is
>>> the start timestamp of a window, something that can uniquely identify
>>> different windows. This works but with one caveat : if we have two
>>> subsequent windows, the ordering of messages is :
>>>
>>> window1 started @ 1000 -> window2 started @ 10000 -> window1 ended @
>>> 8000 -> window2 ended @ 17000
>>>
>>> whereas I expect it to be :
>>> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @
>>> 10000 -> window2 ended @ 17000
>>>
>>> I thought Flink would execute event time timers and process events in
>>> chronological event time order. However, it seems that the onEventTime()
>>> invocation of window1 is called *after *elements from window2 have been
>>> processed even though window1's onEventTime() is earlier in event time.
>>>
>>> Is my approach and reasoning correct? Also, is it possible to get the
>>> messages in the expected order?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Manas,
>>>>
>>>> the problem is that the print() statement is being executed with a
>>>> different parallelism than 1. Due to this fact, the messages coming from
>>>> the window function will be sent in round-robin fashion to the print
>>>> operators. If you remove the setParallelism(1) from the window function,
>>>> then the window function will be executed with the same parallelism as the
>>>> print operator. Due to this fact, there is no round-robin distribution of
>>>> the events but every window function task will simply forward its
>>>> elements to its print operator task. You should be able to see these
>>>> topology differences in the web ui.
>>>>
>>>> You could configure the print() operator to run with a parallelism of 1
>>>> as well by adding a setParallelism(1) statement to it.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>> When I run the example code that you posted, the order of the three
>>>>> messages (window started, contents of window and window ended) is
>>>>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>>>>> used in the pipeline - I assumed this should eliminate any form of race
>>>>> conditions for printing. What's more is that if I *remove*
>>>>> setParallelism(1) from the code, the output is deterministic and correct
>>>>> (i.e. windowStarted -> windowContents -> windowEnded).
>>>>>
>>>>> Clearly, something is wrong with my understanding. What is it?
>>>>>
>>>>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Great to hear that you solved the problem. Let us know if you run
>>>>>> into any other issues.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> This problem is solved[1]. The issue was that the BroadcastStream
>>>>>>> did not contain any watermark, which prevented watermarks for any
>>>>>>> downstream operators from advancing.
>>>>>>> I appreciate all the help.
>>>>>>> [1]
>>>>>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Manas
>>>>>>>
>>>>>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Rafi and Till,
>>>>>>>> Thank you for pointing out that edge case, Rafi.
>>>>>>>>
>>>>>>>> Till, I am trying to get this example working with the
>>>>>>>> BroadcastState pattern upstream to the window operator[1]. The problem is
>>>>>>>> that introducing the BroadcastState makes the onEventTime() *never*
>>>>>>>> fire. Is the BroadcastState somehow eating up the watermark? Do I need to
>>>>>>>> generate the watermark again in the KeyedBroadcastProcessFunction?
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Manas
>>>>>>>>
>>>>>>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Manas and Rafi,
>>>>>>>>>
>>>>>>>>> you are right that when using merging windows as event time
>>>>>>>>> session windows are, then Flink requires that any state the Trigger keeps
>>>>>>>>> is of type MergingState. This constraint allows that the state can be
>>>>>>>>> merged whenever two windows get merged.
>>>>>>>>>
>>>>>>>>> Rafi, you are right. With the current implementation it might
>>>>>>>>> happen that you send a wrong started window message. I think it depends on
>>>>>>>>> the MIN_WINDOW_SIZE and the distribution of your timestamps and, hence,
>>>>>>>>> also your watermark. If you want to be on the safe side, then I would
>>>>>>>>> recommend to use the ProcessFunction to implement the required logic. The
>>>>>>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>>>>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>>>>>>> them yourself, though. However, it would give you access to the
>>>>>>>>> watermark which in turn would allow you to properly handle your described
>>>>>>>>> edge case.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I think one "edge" case which is not handled would be that the
>>>>>>>>>> first event (by event-time) arrives late, then a wrong "started-window"
>>>>>>>>>> would be reported.
>>>>>>>>>>
>>>>>>>>>> Rafi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <
>>>>>>>>>> manaskale96@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is the reason ValueState cannot be use because session windows
>>>>>>>>>>> are always formed by merging proto-windows of single elements, therefore a
>>>>>>>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>>>>>>>> this functionality, but a ReducingState does?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <
>>>>>>>>>>> manaskale96@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>> Thanks for your answer! You also answered the next question
>>>>>>>>>>>> that I was about to ask "Can we share state between a Trigger and a
>>>>>>>>>>>> Window?" Currently the only (convoluted) way to share state between two
>>>>>>>>>>>> operators is through the broadcast state pattern, right?
>>>>>>>>>>>> Also, in your example, why can't we use a
>>>>>>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>>>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>>>>>>>> on a ValueStateDescriptor.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Manas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <
>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Manas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> you can implement something like this with a bit of trigger
>>>>>>>>>>>>> magic. What you need to do is to define your own trigger implementation
>>>>>>>>>>>>> which keeps state to remember whether it has triggered the "started window"
>>>>>>>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>>>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>>>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>>>>>>>> possible at the moment.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <
>>>>>>>>>>>>> manaskale96@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> I want to achieve the following using event time session
>>>>>>>>>>>>>> windows:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. When the window.getStart() and last event timestamp in
>>>>>>>>>>>>>>    the window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>>>>>>    2. When the session window ends, i.e. the watermark
>>>>>>>>>>>>>>    passes lasteventTimestamp + inactivityPeriod, I want to emit a message
>>>>>>>>>>>>>>    "Window ended @ timestamp".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  It is guaranteed that all events are on time and no lateness
>>>>>>>>>>>>>> is allowed. I am having difficulty implementing both 1 and 2
>>>>>>>>>>>>>> simultaneously.
>>>>>>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and
>>>>>>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>>>>>>> window.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is my approach correct or is there a completely different
>>>>>>>>>>>>>> method to achieve this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Manas Kale
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Till Rohrmann <tr...@apache.org>.
A quick update concerning your observations. The reason why you are seeing
the unordered output is because in the gist we used
a AssignerWithPeriodicWatermarks which generates watermarks periodically.
Due to this aspect, it can happen that Flink already process all elements
up to "20" before it sees the next watermark which triggers the processing.
If there are multiple windows being processed, Flink does not give a
guarantee in which order this happens.

You can avoid this behaviour if you used
an AssignerWithPunctuatedWatermarks instead. This watermark assigner is
called for every record. The updated gist [1] shows how it is used.

[1] https://gist.github.com/tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1

Cheers,
Till

On Thu, Mar 26, 2020 at 4:27 PM Till Rohrmann <tr...@apache.org> wrote:

> Hmm, I might have given you a bad advice. I think the problem becomes
> harder because with Flink's window and trigger API we need to keep state
> consistent between the Trigger and the Window function. Maybe it would be
> easier to not rely on the windowing mechanism and instead to use Flink's
> process function [1] to implement the logic yourself.
>
> With the process function you have basically a low level API with which
> you can implement an operator which groups incoming events according to
> sessions and outputs the required information.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 11:27 AM Manas Kale <ma...@gmail.com> wrote:
>
>> Hi Till,
>> I see, thanks for the clarification.
>> Assuming all other setting are the same, if I generate events as follows
>> :
>> Element.from("1", 1000L),
>>                 Element.from("2", 2000L),
>>                 Element.from("3", 3000L),
>>                 Element.from("10", 10000L)
>>                 ,Element.from("11", 11000L),
>>                 Element.from("12", 12000L),
>>                 Element.from("20", 20000L)
>> we will expect 2 session windows to be created {1,2,3} and {10,11,12}
>> with appropriate messages. However, when I run this, there seems to be a
>> problem in the valueState of MyWindowFunction. Apparently that state is
>> being shared by both the session windows, which leads to incorrect results.
>> To solve this, I replaced it with a MapState<Long, Boolean>. The Long is
>> the start timestamp of a window, something that can uniquely identify
>> different windows. This works but with one caveat : if we have two
>> subsequent windows, the ordering of messages is :
>>
>> window1 started @ 1000 -> window2 started @ 10000 -> window1 ended @ 8000
>> -> window2 ended @ 17000
>>
>> whereas I expect it to be :
>> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ 10000
>> -> window2 ended @ 17000
>>
>> I thought Flink would execute event time timers and process events in
>> chronological event time order. However, it seems that the onEventTime()
>> invocation of window1 is called *after *elements from window2 have been
>> processed even though window1's onEventTime() is earlier in event time.
>>
>> Is my approach and reasoning correct? Also, is it possible to get the
>> messages in the expected order?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Manas,
>>>
>>> the problem is that the print() statement is being executed with a
>>> different parallelism than 1. Due to this fact, the messages coming from
>>> the window function will be sent in round-robin fashion to the print
>>> operators. If you remove the setParallelism(1) from the window function,
>>> then the window function will be executed with the same parallelism as the
>>> print operator. Due to this fact, there is no round-robin distribution of
>>> the events but every window function task will simply forward its
>>> elements to its print operator task. You should be able to see these
>>> topology differences in the web ui.
>>>
>>> You could configure the print() operator to run with a parallelism of 1
>>> as well by adding a setParallelism(1) statement to it.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi Till,
>>>> When I run the example code that you posted, the order of the three
>>>> messages (window started, contents of window and window ended) is
>>>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>>>> used in the pipeline - I assumed this should eliminate any form of race
>>>> conditions for printing. What's more is that if I *remove*
>>>> setParallelism(1) from the code, the output is deterministic and correct
>>>> (i.e. windowStarted -> windowContents -> windowEnded).
>>>>
>>>> Clearly, something is wrong with my understanding. What is it?
>>>>
>>>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Great to hear that you solved the problem. Let us know if you run into
>>>>> any other issues.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> This problem is solved[1]. The issue was that the BroadcastStream did
>>>>>> not contain any watermark, which prevented watermarks for any downstream
>>>>>> operators from advancing.
>>>>>> I appreciate all the help.
>>>>>> [1]
>>>>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>>>>
>>>>>> Thanks,
>>>>>> Manas
>>>>>>
>>>>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Rafi and Till,
>>>>>>> Thank you for pointing out that edge case, Rafi.
>>>>>>>
>>>>>>> Till, I am trying to get this example working with the
>>>>>>> BroadcastState pattern upstream to the window operator[1]. The problem is
>>>>>>> that introducing the BroadcastState makes the onEventTime() *never*
>>>>>>> fire. Is the BroadcastState somehow eating up the watermark? Do I need to
>>>>>>> generate the watermark again in the KeyedBroadcastProcessFunction?
>>>>>>>
>>>>>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Manas
>>>>>>>
>>>>>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Manas and Rafi,
>>>>>>>>
>>>>>>>> you are right that when using merging windows as event time session
>>>>>>>> windows are, then Flink requires that any state the Trigger keeps is of
>>>>>>>> type MergingState. This constraint allows that the state can be merged
>>>>>>>> whenever two windows get merged.
>>>>>>>>
>>>>>>>> Rafi, you are right. With the current implementation it might
>>>>>>>> happen that you send a wrong started window message. I think it depends on
>>>>>>>> the MIN_WINDOW_SIZE and the distribution of your timestamps and, hence,
>>>>>>>> also your watermark. If you want to be on the safe side, then I would
>>>>>>>> recommend to use the ProcessFunction to implement the required logic. The
>>>>>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>>>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>>>>>> them yourself, though. However, it would give you access to the
>>>>>>>> watermark which in turn would allow you to properly handle your described
>>>>>>>> edge case.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think one "edge" case which is not handled would be that the
>>>>>>>>> first event (by event-time) arrives late, then a wrong "started-window"
>>>>>>>>> would be reported.
>>>>>>>>>
>>>>>>>>> Rafi
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Is the reason ValueState cannot be use because session windows
>>>>>>>>>> are always formed by merging proto-windows of single elements, therefore a
>>>>>>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>>>>>>> this functionality, but a ReducingState does?
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Till,
>>>>>>>>>>> Thanks for your answer! You also answered the next question that
>>>>>>>>>>> I was about to ask "Can we share state between a Trigger and a Window?"
>>>>>>>>>>> Currently the only (convoluted) way to share state between two operators is
>>>>>>>>>>> through the broadcast state pattern, right?
>>>>>>>>>>> Also, in your example, why can't we use a
>>>>>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>>>>>>> on a ValueStateDescriptor.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Manas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <
>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Manas,
>>>>>>>>>>>>
>>>>>>>>>>>> you can implement something like this with a bit of trigger
>>>>>>>>>>>> magic. What you need to do is to define your own trigger implementation
>>>>>>>>>>>> which keeps state to remember whether it has triggered the "started window"
>>>>>>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>>>>>>> possible at the moment.
>>>>>>>>>>>>
>>>>>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Till
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <
>>>>>>>>>>>> manaskale96@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> I want to achieve the following using event time session
>>>>>>>>>>>>> windows:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. When the window.getStart() and last event timestamp in
>>>>>>>>>>>>>    the window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>>>>>>>>    ended @ timestamp".
>>>>>>>>>>>>>
>>>>>>>>>>>>>  It is guaranteed that all events are on time and no lateness
>>>>>>>>>>>>> is allowed. I am having difficulty implementing both 1 and 2
>>>>>>>>>>>>> simultaneously.
>>>>>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and
>>>>>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>>>>>> window.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is my approach correct or is there a completely different
>>>>>>>>>>>>> method to achieve this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Manas Kale
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Till Rohrmann <tr...@apache.org>.
Hmm, I might have given you a bad advice. I think the problem becomes
harder because with Flink's window and trigger API we need to keep state
consistent between the Trigger and the Window function. Maybe it would be
easier to not rely on the windowing mechanism and instead to use Flink's
process function [1] to implement the logic yourself.

With the process function you have basically a low level API with which you
can implement an operator which groups incoming events according to
sessions and outputs the required information.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Cheers,
Till

On Thu, Mar 26, 2020 at 11:27 AM Manas Kale <ma...@gmail.com> wrote:

> Hi Till,
> I see, thanks for the clarification.
> Assuming all other setting are the same, if I generate events as follows :
> Element.from("1", 1000L),
>                 Element.from("2", 2000L),
>                 Element.from("3", 3000L),
>                 Element.from("10", 10000L)
>                 ,Element.from("11", 11000L),
>                 Element.from("12", 12000L),
>                 Element.from("20", 20000L)
> we will expect 2 session windows to be created {1,2,3} and {10,11,12} with
> appropriate messages. However, when I run this, there seems to be a problem
> in the valueState of MyWindowFunction. Apparently that state is being
> shared by both the session windows, which leads to incorrect results.
> To solve this, I replaced it with a MapState<Long, Boolean>. The Long is
> the start timestamp of a window, something that can uniquely identify
> different windows. This works but with one caveat : if we have two
> subsequent windows, the ordering of messages is :
>
> window1 started @ 1000 -> window2 started @ 10000 -> window1 ended @ 8000
> -> window2 ended @ 17000
>
> whereas I expect it to be :
> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ 10000
> -> window2 ended @ 17000
>
> I thought Flink would execute event time timers and process events in
> chronological event time order. However, it seems that the onEventTime()
> invocation of window1 is called *after *elements from window2 have been
> processed even though window1's onEventTime() is earlier in event time.
>
> Is my approach and reasoning correct? Also, is it possible to get the
> messages in the expected order?
>
> Thanks!
>
>
>
>
>
> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Manas,
>>
>> the problem is that the print() statement is being executed with a
>> different parallelism than 1. Due to this fact, the messages coming from
>> the window function will be sent in round-robin fashion to the print
>> operators. If you remove the setParallelism(1) from the window function,
>> then the window function will be executed with the same parallelism as the
>> print operator. Due to this fact, there is no round-robin distribution of
>> the events but every window function task will simply forward its
>> elements to its print operator task. You should be able to see these
>> topology differences in the web ui.
>>
>> You could configure the print() operator to run with a parallelism of 1
>> as well by adding a setParallelism(1) statement to it.
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale <ma...@gmail.com> wrote:
>>
>>> Hi Till,
>>> When I run the example code that you posted, the order of the three
>>> messages (window started, contents of window and window ended) is
>>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>>> used in the pipeline - I assumed this should eliminate any form of race
>>> conditions for printing. What's more is that if I *remove*
>>> setParallelism(1) from the code, the output is deterministic and correct
>>> (i.e. windowStarted -> windowContents -> windowEnded).
>>>
>>> Clearly, something is wrong with my understanding. What is it?
>>>
>>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Great to hear that you solved the problem. Let us know if you run into
>>>> any other issues.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> This problem is solved[1]. The issue was that the BroadcastStream did
>>>>> not contain any watermark, which prevented watermarks for any downstream
>>>>> operators from advancing.
>>>>> I appreciate all the help.
>>>>> [1]
>>>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>>>
>>>>> Thanks,
>>>>> Manas
>>>>>
>>>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Rafi and Till,
>>>>>> Thank you for pointing out that edge case, Rafi.
>>>>>>
>>>>>> Till, I am trying to get this example working with the BroadcastState
>>>>>> pattern upstream to the window operator[1]. The problem is that introducing
>>>>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>>>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>>>>> watermark again in the KeyedBroadcastProcessFunction?
>>>>>>
>>>>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>>>>
>>>>>> Thanks,
>>>>>> Manas
>>>>>>
>>>>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Manas and Rafi,
>>>>>>>
>>>>>>> you are right that when using merging windows as event time session
>>>>>>> windows are, then Flink requires that any state the Trigger keeps is of
>>>>>>> type MergingState. This constraint allows that the state can be merged
>>>>>>> whenever two windows get merged.
>>>>>>>
>>>>>>> Rafi, you are right. With the current implementation it might happen
>>>>>>> that you send a wrong started window message. I think it depends on the
>>>>>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>>>>>> your watermark. If you want to be on the safe side, then I would recommend
>>>>>>> to use the ProcessFunction to implement the required logic. The
>>>>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>>>>> them yourself, though. However, it would give you access to the
>>>>>>> watermark which in turn would allow you to properly handle your described
>>>>>>> edge case.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think one "edge" case which is not handled would be that the
>>>>>>>> first event (by event-time) arrives late, then a wrong "started-window"
>>>>>>>> would be reported.
>>>>>>>>
>>>>>>>> Rafi
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Is the reason ValueState cannot be use because session windows are
>>>>>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>>>>>> this functionality, but a ReducingState does?
>>>>>>>>>
>>>>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Till,
>>>>>>>>>> Thanks for your answer! You also answered the next question that
>>>>>>>>>> I was about to ask "Can we share state between a Trigger and a Window?"
>>>>>>>>>> Currently the only (convoluted) way to share state between two operators is
>>>>>>>>>> through the broadcast state pattern, right?
>>>>>>>>>> Also, in your example, why can't we use a
>>>>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>>>>>> on a ValueStateDescriptor.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Manas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <
>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Manas,
>>>>>>>>>>>
>>>>>>>>>>> you can implement something like this with a bit of trigger
>>>>>>>>>>> magic. What you need to do is to define your own trigger implementation
>>>>>>>>>>> which keeps state to remember whether it has triggered the "started window"
>>>>>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>>>>>> possible at the moment.
>>>>>>>>>>>
>>>>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <
>>>>>>>>>>> manaskale96@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> I want to achieve the following using event time session
>>>>>>>>>>>> windows:
>>>>>>>>>>>>
>>>>>>>>>>>>    1. When the window.getStart() and last event timestamp in
>>>>>>>>>>>>    the window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>>>>>>>    ended @ timestamp".
>>>>>>>>>>>>
>>>>>>>>>>>>  It is guaranteed that all events are on time and no lateness
>>>>>>>>>>>> is allowed. I am having difficulty implementing both 1 and 2
>>>>>>>>>>>> simultaneously.
>>>>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and
>>>>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>>>>> window.
>>>>>>>>>>>>
>>>>>>>>>>>> Is my approach correct or is there a completely different
>>>>>>>>>>>> method to achieve this?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Manas Kale
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Manas Kale <ma...@gmail.com>.
Hi Till,
I see, thanks for the clarification.
Assuming all other setting are the same, if I generate events as follows :
Element.from("1", 1000L),
                Element.from("2", 2000L),
                Element.from("3", 3000L),
                Element.from("10", 10000L)
                ,Element.from("11", 11000L),
                Element.from("12", 12000L),
                Element.from("20", 20000L)
we will expect 2 session windows to be created {1,2,3} and {10,11,12} with
appropriate messages. However, when I run this, there seems to be a problem
in the valueState of MyWindowFunction. Apparently that state is being
shared by both the session windows, which leads to incorrect results.
To solve this, I replaced it with a MapState<Long, Boolean>. The Long is
the start timestamp of a window, something that can uniquely identify
different windows. This works but with one caveat : if we have two
subsequent windows, the ordering of messages is :

window1 started @ 1000 -> window2 started @ 10000 -> window1 ended @ 8000
-> window2 ended @ 17000

whereas I expect it to be :
window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ 10000
-> window2 ended @ 17000

I thought Flink would execute event time timers and process events in
chronological event time order. However, it seems that the onEventTime()
invocation of window1 is called *after *elements from window2 have been
processed even though window1's onEventTime() is earlier in event time.

Is my approach and reasoning correct? Also, is it possible to get the
messages in the expected order?

Thanks!





On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Manas,
>
> the problem is that the print() statement is being executed with a
> different parallelism than 1. Due to this fact, the messages coming from
> the window function will be sent in round-robin fashion to the print
> operators. If you remove the setParallelism(1) from the window function,
> then the window function will be executed with the same parallelism as the
> print operator. Due to this fact, there is no round-robin distribution of
> the events but every window function task will simply forward its
> elements to its print operator task. You should be able to see these
> topology differences in the web ui.
>
> You could configure the print() operator to run with a parallelism of 1 as
> well by adding a setParallelism(1) statement to it.
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale <ma...@gmail.com> wrote:
>
>> Hi Till,
>> When I run the example code that you posted, the order of the three
>> messages (window started, contents of window and window ended) is
>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>> used in the pipeline - I assumed this should eliminate any form of race
>> conditions for printing. What's more is that if I *remove*
>> setParallelism(1) from the code, the output is deterministic and correct
>> (i.e. windowStarted -> windowContents -> windowEnded).
>>
>> Clearly, something is wrong with my understanding. What is it?
>>
>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Great to hear that you solved the problem. Let us know if you run into
>>> any other issues.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> This problem is solved[1]. The issue was that the BroadcastStream did
>>>> not contain any watermark, which prevented watermarks for any downstream
>>>> operators from advancing.
>>>> I appreciate all the help.
>>>> [1]
>>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Rafi and Till,
>>>>> Thank you for pointing out that edge case, Rafi.
>>>>>
>>>>> Till, I am trying to get this example working with the BroadcastState
>>>>> pattern upstream to the window operator[1]. The problem is that introducing
>>>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>>>> watermark again in the KeyedBroadcastProcessFunction?
>>>>>
>>>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>>>
>>>>> Thanks,
>>>>> Manas
>>>>>
>>>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Manas and Rafi,
>>>>>>
>>>>>> you are right that when using merging windows as event time session
>>>>>> windows are, then Flink requires that any state the Trigger keeps is of
>>>>>> type MergingState. This constraint allows that the state can be merged
>>>>>> whenever two windows get merged.
>>>>>>
>>>>>> Rafi, you are right. With the current implementation it might happen
>>>>>> that you send a wrong started window message. I think it depends on the
>>>>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>>>>> your watermark. If you want to be on the safe side, then I would recommend
>>>>>> to use the ProcessFunction to implement the required logic. The
>>>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>>>> them yourself, though. However, it would give you access to the
>>>>>> watermark which in turn would allow you to properly handle your described
>>>>>> edge case.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I think one "edge" case which is not handled would be that the first
>>>>>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>>>>>> reported.
>>>>>>>
>>>>>>> Rafi
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Is the reason ValueState cannot be use because session windows are
>>>>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>>>>> this functionality, but a ReducingState does?
>>>>>>>>
>>>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Till,
>>>>>>>>> Thanks for your answer! You also answered the next question that I
>>>>>>>>> was about to ask "Can we share state between a Trigger and a Window?"
>>>>>>>>> Currently the only (convoluted) way to share state between two operators is
>>>>>>>>> through the broadcast state pattern, right?
>>>>>>>>> Also, in your example, why can't we use a
>>>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>>>>> on a ValueStateDescriptor.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Manas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <
>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Manas,
>>>>>>>>>>
>>>>>>>>>> you can implement something like this with a bit of trigger
>>>>>>>>>> magic. What you need to do is to define your own trigger implementation
>>>>>>>>>> which keeps state to remember whether it has triggered the "started window"
>>>>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>>>>> possible at the moment.
>>>>>>>>>>
>>>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> I want to achieve the following using event time session windows:
>>>>>>>>>>>
>>>>>>>>>>>    1. When the window.getStart() and last event timestamp in
>>>>>>>>>>>    the window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>>>>>>    ended @ timestamp".
>>>>>>>>>>>
>>>>>>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and
>>>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>>>> window.
>>>>>>>>>>>
>>>>>>>>>>> Is my approach correct or is there a completely different method
>>>>>>>>>>> to achieve this?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Manas Kale
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Till Rohrmann <tr...@apache.org>.
Hi Manas,

the problem is that the print() statement is being executed with a
different parallelism than 1. Due to this fact, the messages coming from
the window function will be sent in round-robin fashion to the print
operators. If you remove the setParallelism(1) from the window function,
then the window function will be executed with the same parallelism as the
print operator. Due to this fact, there is no round-robin distribution of
the events but every window function task will simply forward its
elements to its print operator task. You should be able to see these
topology differences in the web ui.

You could configure the print() operator to run with a parallelism of 1 as
well by adding a setParallelism(1) statement to it.

Cheers,
Till

On Thu, Mar 26, 2020 at 7:11 AM Manas Kale <ma...@gmail.com> wrote:

> Hi Till,
> When I run the example code that you posted, the order of the three
> messages (window started, contents of window and window ended) is
> non-deterministic. This is surprising to me, as setParallelism(1) has been
> used in the pipeline - I assumed this should eliminate any form of race
> conditions for printing. What's more is that if I *remove*
> setParallelism(1) from the code, the output is deterministic and correct
> (i.e. windowStarted -> windowContents -> windowEnded).
>
> Clearly, something is wrong with my understanding. What is it?
>
> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Great to hear that you solved the problem. Let us know if you run into
>> any other issues.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <ma...@gmail.com> wrote:
>>
>>> Hi,
>>> This problem is solved[1]. The issue was that the BroadcastStream did
>>> not contain any watermark, which prevented watermarks for any downstream
>>> operators from advancing.
>>> I appreciate all the help.
>>> [1]
>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>
>>> Thanks,
>>> Manas
>>>
>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi Rafi and Till,
>>>> Thank you for pointing out that edge case, Rafi.
>>>>
>>>> Till, I am trying to get this example working with the BroadcastState
>>>> pattern upstream to the window operator[1]. The problem is that introducing
>>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>>> watermark again in the KeyedBroadcastProcessFunction?
>>>>
>>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Manas and Rafi,
>>>>>
>>>>> you are right that when using merging windows as event time session
>>>>> windows are, then Flink requires that any state the Trigger keeps is of
>>>>> type MergingState. This constraint allows that the state can be merged
>>>>> whenever two windows get merged.
>>>>>
>>>>> Rafi, you are right. With the current implementation it might happen
>>>>> that you send a wrong started window message. I think it depends on the
>>>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>>>> your watermark. If you want to be on the safe side, then I would recommend
>>>>> to use the ProcessFunction to implement the required logic. The
>>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>>> them yourself, though. However, it would give you access to the
>>>>> watermark which in turn would allow you to properly handle your described
>>>>> edge case.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think one "edge" case which is not handled would be that the first
>>>>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>>>>> reported.
>>>>>>
>>>>>> Rafi
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Is the reason ValueState cannot be use because session windows are
>>>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>>>> this functionality, but a ReducingState does?
>>>>>>>
>>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Till,
>>>>>>>> Thanks for your answer! You also answered the next question that I
>>>>>>>> was about to ask "Can we share state between a Trigger and a Window?"
>>>>>>>> Currently the only (convoluted) way to share state between two operators is
>>>>>>>> through the broadcast state pattern, right?
>>>>>>>> Also, in your example, why can't we use a
>>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>>>> on a ValueStateDescriptor.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Manas
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Manas,
>>>>>>>>>
>>>>>>>>> you can implement something like this with a bit of trigger magic.
>>>>>>>>> What you need to do is to define your own trigger implementation which
>>>>>>>>> keeps state to remember whether it has triggered the "started window"
>>>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>>>> possible at the moment.
>>>>>>>>>
>>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> I want to achieve the following using event time session windows:
>>>>>>>>>>
>>>>>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>>>>>    ended @ timestamp".
>>>>>>>>>>
>>>>>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and
>>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>>> window.
>>>>>>>>>>
>>>>>>>>>> Is my approach correct or is there a completely different method
>>>>>>>>>> to achieve this?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Manas Kale
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Manas Kale <ma...@gmail.com>.
Hi Till,
When I run the example code that you posted, the order of the three
messages (window started, contents of window and window ended) is
non-deterministic. This is surprising to me, as setParallelism(1) has been
used in the pipeline - I assumed this should eliminate any form of race
conditions for printing. What's more is that if I *remove*
setParallelism(1) from the code, the output is deterministic and correct
(i.e. windowStarted -> windowContents -> windowEnded).

Clearly, something is wrong with my understanding. What is it?

On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <tr...@apache.org> wrote:

> Great to hear that you solved the problem. Let us know if you run into any
> other issues.
>
> Cheers,
> Till
>
> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <ma...@gmail.com> wrote:
>
>> Hi,
>> This problem is solved[1]. The issue was that the BroadcastStream did not
>> contain any watermark, which prevented watermarks for any downstream
>> operators from advancing.
>> I appreciate all the help.
>> [1]
>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>
>> Thanks,
>> Manas
>>
>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com> wrote:
>>
>>> Hi Rafi and Till,
>>> Thank you for pointing out that edge case, Rafi.
>>>
>>> Till, I am trying to get this example working with the BroadcastState
>>> pattern upstream to the window operator[1]. The problem is that introducing
>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>> watermark again in the KeyedBroadcastProcessFunction?
>>>
>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>
>>> Thanks,
>>> Manas
>>>
>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Manas and Rafi,
>>>>
>>>> you are right that when using merging windows as event time session
>>>> windows are, then Flink requires that any state the Trigger keeps is of
>>>> type MergingState. This constraint allows that the state can be merged
>>>> whenever two windows get merged.
>>>>
>>>> Rafi, you are right. With the current implementation it might happen
>>>> that you send a wrong started window message. I think it depends on the
>>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>>> your watermark. If you want to be on the safe side, then I would recommend
>>>> to use the ProcessFunction to implement the required logic. The
>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>> them yourself, though. However, it would give you access to the
>>>> watermark which in turn would allow you to properly handle your described
>>>> edge case.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> I think one "edge" case which is not handled would be that the first
>>>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>>>> reported.
>>>>>
>>>>> Rafi
>>>>>
>>>>>
>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is the reason ValueState cannot be use because session windows are
>>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>>> this functionality, but a ReducingState does?
>>>>>>
>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>> Thanks for your answer! You also answered the next question that I
>>>>>>> was about to ask "Can we share state between a Trigger and a Window?"
>>>>>>> Currently the only (convoluted) way to share state between two operators is
>>>>>>> through the broadcast state pattern, right?
>>>>>>> Also, in your example, why can't we use a
>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>>> on a ValueStateDescriptor.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Manas
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Manas,
>>>>>>>>
>>>>>>>> you can implement something like this with a bit of trigger magic.
>>>>>>>> What you need to do is to define your own trigger implementation which
>>>>>>>> keeps state to remember whether it has triggered the "started window"
>>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>>> possible at the moment.
>>>>>>>>
>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I want to achieve the following using event time session windows:
>>>>>>>>>
>>>>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>>>>    ended @ timestamp".
>>>>>>>>>
>>>>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and
>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>> window.
>>>>>>>>>
>>>>>>>>> Is my approach correct or is there a completely different method
>>>>>>>>> to achieve this?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Manas Kale
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Till Rohrmann <tr...@apache.org>.
Great to hear that you solved the problem. Let us know if you run into any
other issues.

Cheers,
Till

On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <ma...@gmail.com> wrote:

> Hi,
> This problem is solved[1]. The issue was that the BroadcastStream did not
> contain any watermark, which prevented watermarks for any downstream
> operators from advancing.
> I appreciate all the help.
> [1]
> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>
> Thanks,
> Manas
>
> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com> wrote:
>
>> Hi Rafi and Till,
>> Thank you for pointing out that edge case, Rafi.
>>
>> Till, I am trying to get this example working with the BroadcastState
>> pattern upstream to the window operator[1]. The problem is that introducing
>> the BroadcastState makes the onEventTime() *never* fire. Is the
>> BroadcastState somehow eating up the watermark? Do I need to generate the
>> watermark again in the KeyedBroadcastProcessFunction?
>>
>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>
>> Thanks,
>> Manas
>>
>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Manas and Rafi,
>>>
>>> you are right that when using merging windows as event time session
>>> windows are, then Flink requires that any state the Trigger keeps is of
>>> type MergingState. This constraint allows that the state can be merged
>>> whenever two windows get merged.
>>>
>>> Rafi, you are right. With the current implementation it might happen
>>> that you send a wrong started window message. I think it depends on the
>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>> your watermark. If you want to be on the safe side, then I would recommend
>>> to use the ProcessFunction to implement the required logic. The
>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>> and timers. In it, you would need to buffer the elements and to sessionize
>>> them yourself, though. However, it would give you access to the
>>> watermark which in turn would allow you to properly handle your described
>>> edge case.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>
>>> Cheers,
>>> Till
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com>
>>> wrote:
>>>
>>>> I think one "edge" case which is not handled would be that the first
>>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>>> reported.
>>>>
>>>> Rafi
>>>>
>>>>
>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Is the reason ValueState cannot be use because session windows are
>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>> this functionality, but a ReducingState does?
>>>>>
>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>> Thanks for your answer! You also answered the next question that I
>>>>>> was about to ask "Can we share state between a Trigger and a Window?"
>>>>>> Currently the only (convoluted) way to share state between two operators is
>>>>>> through the broadcast state pattern, right?
>>>>>> Also, in your example, why can't we use a
>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>> on a ValueStateDescriptor.
>>>>>>
>>>>>> Regards,
>>>>>> Manas
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Manas,
>>>>>>>
>>>>>>> you can implement something like this with a bit of trigger magic.
>>>>>>> What you need to do is to define your own trigger implementation which
>>>>>>> keeps state to remember whether it has triggered the "started window"
>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>> possible at the moment.
>>>>>>>
>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>
>>>>>>> [1]
>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I want to achieve the following using event time session windows:
>>>>>>>>
>>>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>>>    ended @ timestamp".
>>>>>>>>
>>>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>>>> I am able to implement point 1 using a custom trigger, which checks
>>>>>>>> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers
>>>>>>>> a customProcessWindowFunction().
>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>> window.
>>>>>>>>
>>>>>>>> Is my approach correct or is there a completely different method to
>>>>>>>> achieve this?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Manas Kale
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Manas Kale <ma...@gmail.com>.
Hi,
This problem is solved[1]. The issue was that the BroadcastStream did not
contain any watermark, which prevented watermarks for any downstream
operators from advancing.
I appreciate all the help.
[1]
https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern

Thanks,
Manas

On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <ma...@gmail.com> wrote:

> Hi Rafi and Till,
> Thank you for pointing out that edge case, Rafi.
>
> Till, I am trying to get this example working with the BroadcastState
> pattern upstream to the window operator[1]. The problem is that introducing
> the BroadcastState makes the onEventTime() *never* fire. Is the
> BroadcastState somehow eating up the watermark? Do I need to generate the
> watermark again in the KeyedBroadcastProcessFunction?
>
> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>
> Thanks,
> Manas
>
> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Manas and Rafi,
>>
>> you are right that when using merging windows as event time session
>> windows are, then Flink requires that any state the Trigger keeps is of
>> type MergingState. This constraint allows that the state can be merged
>> whenever two windows get merged.
>>
>> Rafi, you are right. With the current implementation it might happen that
>> you send a wrong started window message. I think it depends on the
>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>> your watermark. If you want to be on the safe side, then I would recommend
>> to use the ProcessFunction to implement the required logic. The
>> ProcessFunction [1] is Flink's low level API and gives you access to state
>> and timers. In it, you would need to buffer the elements and to sessionize
>> them yourself, though. However, it would give you access to the
>> watermark which in turn would allow you to properly handle your described
>> edge case.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> Cheers,
>> Till
>>
>> Cheers,
>> Till
>>
>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com> wrote:
>>
>>> I think one "edge" case which is not handled would be that the first
>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>> reported.
>>>
>>> Rafi
>>>
>>>
>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>>> wrote:
>>>
>>>> Is the reason ValueState cannot be use because session windows are
>>>> always formed by merging proto-windows of single elements, therefore a
>>>> state store is needed that can handle merging. ValueState does not provide
>>>> this functionality, but a ReducingState does?
>>>>
>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>> Thanks for your answer! You also answered the next question that I was
>>>>> about to ask "Can we share state between a Trigger and a Window?" Currently
>>>>> the only (convoluted) way to share state between two operators is through
>>>>> the broadcast state pattern, right?
>>>>> Also, in your example, why can't we use a
>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>> on a ValueStateDescriptor.
>>>>>
>>>>> Regards,
>>>>> Manas
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Manas,
>>>>>>
>>>>>> you can implement something like this with a bit of trigger magic.
>>>>>> What you need to do is to define your own trigger implementation which
>>>>>> keeps state to remember whether it has triggered the "started window"
>>>>>> message or not. In the stateful window function you would need to do
>>>>>> something similar. The first call could trigger the output of "window
>>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>> possible at the moment.
>>>>>>
>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>
>>>>>> [1]
>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I want to achieve the following using event time session windows:
>>>>>>>
>>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>>    message "Window started @ timestamp".
>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>>    ended @ timestamp".
>>>>>>>
>>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>>> I am able to implement point 1 using a custom trigger, which checks
>>>>>>> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers
>>>>>>> a customProcessWindowFunction().
>>>>>>> However, with this architecture I can't detect the end of the window.
>>>>>>>
>>>>>>> Is my approach correct or is there a completely different method to
>>>>>>> achieve this?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Manas Kale
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Re: Emit message at start and end of event time session window

Posted by Manas Kale <ma...@gmail.com>.
Hi Rafi and Till,
Thank you for pointing out that edge case, Rafi.

Till, I am trying to get this example working with the BroadcastState
pattern upstream to the window operator[1]. The problem is that introducing
the BroadcastState makes the onEventTime() *never* fire. Is the
BroadcastState somehow eating up the watermark? Do I need to generate the
watermark again in the KeyedBroadcastProcessFunction?

[1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49

Thanks,
Manas

On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Manas and Rafi,
>
> you are right that when using merging windows as event time session
> windows are, then Flink requires that any state the Trigger keeps is of
> type MergingState. This constraint allows that the state can be merged
> whenever two windows get merged.
>
> Rafi, you are right. With the current implementation it might happen that
> you send a wrong started window message. I think it depends on the
> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
> your watermark. If you want to be on the safe side, then I would recommend
> to use the ProcessFunction to implement the required logic. The
> ProcessFunction [1] is Flink's low level API and gives you access to state
> and timers. In it, you would need to buffer the elements and to sessionize
> them yourself, though. However, it would give you access to the
> watermark which in turn would allow you to properly handle your described
> edge case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
> Cheers,
> Till
>
> Cheers,
> Till
>
> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com> wrote:
>
>> I think one "edge" case which is not handled would be that the first
>> event (by event-time) arrives late, then a wrong "started-window" would be
>> reported.
>>
>> Rafi
>>
>>
>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com>
>> wrote:
>>
>>> Is the reason ValueState cannot be use because session windows are
>>> always formed by merging proto-windows of single elements, therefore a
>>> state store is needed that can handle merging. ValueState does not provide
>>> this functionality, but a ReducingState does?
>>>
>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi Till,
>>>> Thanks for your answer! You also answered the next question that I was
>>>> about to ask "Can we share state between a Trigger and a Window?" Currently
>>>> the only (convoluted) way to share state between two operators is through
>>>> the broadcast state pattern, right?
>>>> Also, in your example, why can't we use a ValueStateDescriptor<Boolean>
>>>> in the Trigger? I tried using it in my own example but it  I am not able
>>>> to  call the mergePartitionedState() method on a ValueStateDescriptor.
>>>>
>>>> Regards,
>>>> Manas
>>>>
>>>>
>>>>
>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Manas,
>>>>>
>>>>> you can implement something like this with a bit of trigger magic.
>>>>> What you need to do is to define your own trigger implementation which
>>>>> keeps state to remember whether it has triggered the "started window"
>>>>> message or not. In the stateful window function you would need to do
>>>>> something similar. The first call could trigger the output of "window
>>>>> started" and any subsequent call will trigger the evaluation of the window.
>>>>> It would have been a bit easier if the trigger and the window process
>>>>> function could share its internal state. Unfortunately, this is not
>>>>> possible at the moment.
>>>>>
>>>>> I've drafted a potential solution which you can find here [1].
>>>>>
>>>>> [1]
>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I want to achieve the following using event time session windows:
>>>>>>
>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>    message "Window started @ timestamp".
>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>>    ended @ timestamp".
>>>>>>
>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>> I am able to implement point 1 using a custom trigger, which checks
>>>>>> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers
>>>>>> a customProcessWindowFunction().
>>>>>> However, with this architecture I can't detect the end of the window.
>>>>>>
>>>>>> Is my approach correct or is there a completely different method to
>>>>>> achieve this?
>>>>>>
>>>>>> Thanks,
>>>>>> Manas Kale
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Re: Emit message at start and end of event time session window

Posted by Till Rohrmann <tr...@apache.org>.
Hi Manas and Rafi,

you are right that when using merging windows as event time session windows
are, then Flink requires that any state the Trigger keeps is of type
MergingState. This constraint allows that the state can be merged whenever
two windows get merged.

Rafi, you are right. With the current implementation it might happen that
you send a wrong started window message. I think it depends on the
MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
your watermark. If you want to be on the safe side, then I would recommend
to use the ProcessFunction to implement the required logic. The
ProcessFunction [1] is Flink's low level API and gives you access to state
and timers. In it, you would need to buffer the elements and to sessionize
them yourself, though. However, it would give you access to the
watermark which in turn would allow you to properly handle your described
edge case.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Cheers,
Till

Cheers,
Till

On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <ra...@gmail.com> wrote:

> I think one "edge" case which is not handled would be that the first event
> (by event-time) arrives late, then a wrong "started-window" would be
> reported.
>
> Rafi
>
>
> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com> wrote:
>
>> Is the reason ValueState cannot be use because session windows are always
>> formed by merging proto-windows of single elements, therefore a state store
>> is needed that can handle merging. ValueState does not provide this
>> functionality, but a ReducingState does?
>>
>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com> wrote:
>>
>>> Hi Till,
>>> Thanks for your answer! You also answered the next question that I was
>>> about to ask "Can we share state between a Trigger and a Window?" Currently
>>> the only (convoluted) way to share state between two operators is through
>>> the broadcast state pattern, right?
>>> Also, in your example, why can't we use a ValueStateDescriptor<Boolean>
>>> in the Trigger? I tried using it in my own example but it  I am not able
>>> to  call the mergePartitionedState() method on a ValueStateDescriptor.
>>>
>>> Regards,
>>> Manas
>>>
>>>
>>>
>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Manas,
>>>>
>>>> you can implement something like this with a bit of trigger magic. What
>>>> you need to do is to define your own trigger implementation which keeps
>>>> state to remember whether it has triggered the "started window" message or
>>>> not. In the stateful window function you would need to do something
>>>> similar. The first call could trigger the output of "window started" and
>>>> any subsequent call will trigger the evaluation of the window. It would
>>>> have been a bit easier if the trigger and the window process function could
>>>> share its internal state. Unfortunately, this is not possible at the moment.
>>>>
>>>> I've drafted a potential solution which you can find here [1].
>>>>
>>>> [1]
>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I want to achieve the following using event time session windows:
>>>>>
>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>    message "Window started @ timestamp".
>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>>    ended @ timestamp".
>>>>>
>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>> I am able to implement point 1 using a custom trigger, which checks
>>>>> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers
>>>>> a customProcessWindowFunction().
>>>>> However, with this architecture I can't detect the end of the window.
>>>>>
>>>>> Is my approach correct or is there a completely different method to
>>>>> achieve this?
>>>>>
>>>>> Thanks,
>>>>> Manas Kale
>>>>>
>>>>>
>>>>>
>>>>>

Re: Emit message at start and end of event time session window

Posted by Rafi Aroch <ra...@gmail.com>.
I think one "edge" case which is not handled would be that the first event
(by event-time) arrives late, then a wrong "started-window" would be
reported.

Rafi


On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <ma...@gmail.com> wrote:

> Is the reason ValueState cannot be use because session windows are always
> formed by merging proto-windows of single elements, therefore a state store
> is needed that can handle merging. ValueState does not provide this
> functionality, but a ReducingState does?
>
> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com> wrote:
>
>> Hi Till,
>> Thanks for your answer! You also answered the next question that I was
>> about to ask "Can we share state between a Trigger and a Window?" Currently
>> the only (convoluted) way to share state between two operators is through
>> the broadcast state pattern, right?
>> Also, in your example, why can't we use a ValueStateDescriptor<Boolean>
>> in the Trigger? I tried using it in my own example but it  I am not able
>> to  call the mergePartitionedState() method on a ValueStateDescriptor.
>>
>> Regards,
>> Manas
>>
>>
>>
>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Manas,
>>>
>>> you can implement something like this with a bit of trigger magic. What
>>> you need to do is to define your own trigger implementation which keeps
>>> state to remember whether it has triggered the "started window" message or
>>> not. In the stateful window function you would need to do something
>>> similar. The first call could trigger the output of "window started" and
>>> any subsequent call will trigger the evaluation of the window. It would
>>> have been a bit easier if the trigger and the window process function could
>>> share its internal state. Unfortunately, this is not possible at the moment.
>>>
>>> I've drafted a potential solution which you can find here [1].
>>>
>>> [1]
>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I want to achieve the following using event time session windows:
>>>>
>>>>    1. When the window.getStart() and last event timestamp in the
>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>    message "Window started @ timestamp".
>>>>    2. When the session window ends, i.e. the watermark passes
>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>>    ended @ timestamp".
>>>>
>>>>  It is guaranteed that all events are on time and no lateness is
>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>> I am able to implement point 1 using a custom trigger, which checks if
>>>> (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
>>>> customProcessWindowFunction().
>>>> However, with this architecture I can't detect the end of the window.
>>>>
>>>> Is my approach correct or is there a completely different method to
>>>> achieve this?
>>>>
>>>> Thanks,
>>>> Manas Kale
>>>>
>>>>
>>>>
>>>>

Re: Emit message at start and end of event time session window

Posted by Manas Kale <ma...@gmail.com>.
Is the reason ValueState cannot be use because session windows are always
formed by merging proto-windows of single elements, therefore a state store
is needed that can handle merging. ValueState does not provide this
functionality, but a ReducingState does?

On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <ma...@gmail.com> wrote:

> Hi Till,
> Thanks for your answer! You also answered the next question that I was
> about to ask "Can we share state between a Trigger and a Window?" Currently
> the only (convoluted) way to share state between two operators is through
> the broadcast state pattern, right?
> Also, in your example, why can't we use a ValueStateDescriptor<Boolean> in
> the Trigger? I tried using it in my own example but it  I am not able to
> call the mergePartitionedState() method on a ValueStateDescriptor.
>
> Regards,
> Manas
>
>
>
> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Manas,
>>
>> you can implement something like this with a bit of trigger magic. What
>> you need to do is to define your own trigger implementation which keeps
>> state to remember whether it has triggered the "started window" message or
>> not. In the stateful window function you would need to do something
>> similar. The first call could trigger the output of "window started" and
>> any subsequent call will trigger the evaluation of the window. It would
>> have been a bit easier if the trigger and the window process function could
>> share its internal state. Unfortunately, this is not possible at the moment.
>>
>> I've drafted a potential solution which you can find here [1].
>>
>> [1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com> wrote:
>>
>>> Hi,
>>> I want to achieve the following using event time session windows:
>>>
>>>    1. When the window.getStart() and last event timestamp in the window
>>>    is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message
>>>    "Window started @ timestamp".
>>>    2. When the session window ends, i.e. the watermark passes
>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>    ended @ timestamp".
>>>
>>>  It is guaranteed that all events are on time and no lateness is
>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>> I am able to implement point 1 using a custom trigger, which checks if
>>> (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
>>> customProcessWindowFunction().
>>> However, with this architecture I can't detect the end of the window.
>>>
>>> Is my approach correct or is there a completely different method to
>>> achieve this?
>>>
>>> Thanks,
>>> Manas Kale
>>>
>>>
>>>
>>>

Re: Emit message at start and end of event time session window

Posted by Manas Kale <ma...@gmail.com>.
Hi Till,
Thanks for your answer! You also answered the next question that I was
about to ask "Can we share state between a Trigger and a Window?" Currently
the only (convoluted) way to share state between two operators is through
the broadcast state pattern, right?
Also, in your example, why can't we use a ValueStateDescriptor<Boolean> in
the Trigger? I tried using it in my own example but it  I am not able to
call the mergePartitionedState() method on a ValueStateDescriptor.

Regards,
Manas



On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Manas,
>
> you can implement something like this with a bit of trigger magic. What
> you need to do is to define your own trigger implementation which keeps
> state to remember whether it has triggered the "started window" message or
> not. In the stateful window function you would need to do something
> similar. The first call could trigger the output of "window started" and
> any subsequent call will trigger the evaluation of the window. It would
> have been a bit easier if the trigger and the window process function could
> share its internal state. Unfortunately, this is not possible at the moment.
>
> I've drafted a potential solution which you can find here [1].
>
> [1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>
> Cheers,
> Till
>
> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com> wrote:
>
>> Hi,
>> I want to achieve the following using event time session windows:
>>
>>    1. When the window.getStart() and last event timestamp in the window
>>    is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message
>>    "Window started @ timestamp".
>>    2. When the session window ends, i.e. the watermark passes
>>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>    ended @ timestamp".
>>
>>  It is guaranteed that all events are on time and no lateness is allowed.
>> I am having difficulty implementing both 1 and 2 simultaneously.
>> I am able to implement point 1 using a custom trigger, which checks if
>> (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
>> customProcessWindowFunction().
>> However, with this architecture I can't detect the end of the window.
>>
>> Is my approach correct or is there a completely different method to
>> achieve this?
>>
>> Thanks,
>> Manas Kale
>>
>>
>>
>>

Re: Emit message at start and end of event time session window

Posted by Till Rohrmann <tr...@apache.org>.
Hi Manas,

you can implement something like this with a bit of trigger magic. What you
need to do is to define your own trigger implementation which keeps state
to remember whether it has triggered the "started window" message or not.
In the stateful window function you would need to do something similar. The
first call could trigger the output of "window started" and any subsequent
call will trigger the evaluation of the window. It would have been a bit
easier if the trigger and the window process function could share its
internal state. Unfortunately, this is not possible at the moment.

I've drafted a potential solution which you can find here [1].

[1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef

Cheers,
Till

On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <ma...@gmail.com> wrote:

> Hi,
> I want to achieve the following using event time session windows:
>
>    1. When the window.getStart() and last event timestamp in the window
>    is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message
>    "Window started @ timestamp".
>    2. When the session window ends, i.e. the watermark passes
>    lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>    ended @ timestamp".
>
>  It is guaranteed that all events are on time and no lateness is allowed.
> I am having difficulty implementing both 1 and 2 simultaneously.
> I am able to implement point 1 using a custom trigger, which checks if
> (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
> customProcessWindowFunction().
> However, with this architecture I can't detect the end of the window.
>
> Is my approach correct or is there a completely different method to
> achieve this?
>
> Thanks,
> Manas Kale
>
>
>
>