You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aitozi <gj...@gmail.com> on 2017/12/26 03:58:23 UTC

MergingWindow

Hi,

i cant unserstand usage of this snippest of the code in
MergingWindowSet.java, can anyone explain this for me ?


if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
				mergeFunction.merge(mergeResult,
						mergedWindows,
						this.mapping.get(mergeResult),
						mergedStateWindows);
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: MergingWindow

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, this is a very good description!

To see this in action you can run MergingWindowSetTest and comment out the check in MergingWindowSet, then you will see test failures and can trace what situations lead to problematic behaviour without that check.

> On 29. Dec 2017, at 15:07, jincheng sun <su...@gmail.com> wrote:
> 
> Hi  aitozi,
> 
> `MergingWindowSet` is a Utility, used for keeping track of merging Windows when using a MergingWindowAssigner in a WindowOperator. 
> 
> In flink  `MergingWindowAssigner`  only used for SessionWindow. The implementations of  `MergingWindowAssigner` are `EventTimeSessionWindows` and `ProcessingTimeSessionWindows`. As we know Session window depends on the element of the time gap to split the window, if you encounter out-of-order elements, there is a merge window situation.  
> 
> 
> In the `processElement` method of WindowOperator, when adding the new window might result in a merge. the merge logic is in the `addWindow` method of `MergingWindowSet`. The snippet you mentioned is in that method. To understand the code snippet above, we must understand the collation logic of the merge window。 Let me cite a merge example to illustrate the merging logic, if we have two windows WinA, WinB, when WinC is added, we find WinA, WinB, WinC should be merged. So, in this time WinC is new Window WinA and WinB are in `MergingWindowSet.mapping`, the `mapping` is Map<W, W>, Mapping from window to the window that keeps the window state. When we are incrementally merging windows starting from some window we keep that starting window as the state window to prevent costly state juggling. As shown below:
> 
> <merge.png>
> ​
> Now we  know the logic of merge window, and we talk about the logic of you mentioned above:
> if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
>                                 mergeFunction.merge(mergeResult,
>                                                 mergedWindows,
>                                                 this.mapping.get(mergeResult),
>                                                 mergedStateWindows);
> }
> 
> This code is guarantee that don't merge the new window itself, it never had any state associated with it i.e. if we are only merging one pre-existing window into itself without extending the pre-exising window.
> 
> I am not sure if the explanation is clear, but I hope to be helpful to you. :) 
> And welcome anybody feedback... :)
> 
> Best, Jincheng
> 
> 2017-12-27 16:58 GMT+08:00 Ufuk Celebi <uce@apache.org <ma...@apache.org>>:
> Please check your email before sending it the next time as three
> emails for the same message is a little spammy ;-)
> 
> This is internal code that is used to implement session windows as far
> as I can tell. The idea is to not merge the new window as it never had
> any state associated with it. The general idea of merging windows is
> to keep one of the original windows as the state window, i.e. the
> window that is used as namespace to store the window elements.
> Elements from the state windows of merged windows must be merged into
> this one state window.
> 
> For more details, this should be directed to the dev mailing list.
> 
> – Ufuk
> 
> On Tue, Dec 26, 2017 at 4:58 AM, aitozi <gjying1314@gmail.com <ma...@gmail.com>> wrote:
> > Hi,
> >
> > i cant unserstand usage of this snippest of the code in
> > MergingWindowSet.java, can anyone explain this for me ?
> >
> >
> > if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
> >                                 mergeFunction.merge(mergeResult,
> >                                                 mergedWindows,
> >                                                 this.mapping.get(mergeResult),
> >                                                 mergedStateWindows);
> > }
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 


Re: MergingWindow

Posted by jincheng sun <su...@gmail.com>.
Hi  aitozi,

`MergingWindowSet` is a Utility, used for keeping track of merging Windows
when using a MergingWindowAssigner in a WindowOperator.

In flink  `MergingWindowAssigner`  only used for SessionWindow. The
implementations of  `MergingWindowAssigner` are `EventTimeSessionWindows`
and `ProcessingTimeSessionWindows`. As we know Session window depends on
the element of the time gap to split the window, if you encounter
out-of-order elements, there is a merge window situation.


In the `processElement` method of WindowOperator, when adding the new
window might result in a merge. the merge logic is in the `addWindow`
method of `MergingWindowSet`. The snippet you mentioned is in that method.
To understand the code snippet above, we must understand the collation
logic of the merge window。 Let me cite a merge example to illustrate the
merging logic, if we have two windows WinA, WinB, when WinC is added, we
find WinA, WinB, WinC should be merged. So, in this time WinC is new Window
WinA and WinB are in `MergingWindowSet.mapping`, the `mapping` is Map<W,
W>, Mapping from window to the window that keeps the window state. When we
are incrementally merging windows starting from some window we keep that
starting window as the state window to prevent costly state juggling. As
shown below:


​
Now we  know the logic of merge window, and we talk about the logic of you
mentioned above:

> if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
>                                 mergeFunction.merge(mergeResult,
>                                                 mergedWindows,
>
> this.mapping.get(mergeResult),
>                                                 mergedStateWindows);
> }


This code is guarantee that don't merge the new window itself, it never had
any state associated with it i.e. if we are only merging one pre-existing
window into itself without extending the pre-exising window.

I am not sure if the explanation is clear, but I hope to be helpful to you.
:)
And welcome anybody feedback... :)

Best, Jincheng

2017-12-27 16:58 GMT+08:00 Ufuk Celebi <uc...@apache.org>:

> Please check your email before sending it the next time as three
> emails for the same message is a little spammy ;-)
>
> This is internal code that is used to implement session windows as far
> as I can tell. The idea is to not merge the new window as it never had
> any state associated with it. The general idea of merging windows is
> to keep one of the original windows as the state window, i.e. the
> window that is used as namespace to store the window elements.
> Elements from the state windows of merged windows must be merged into
> this one state window.
>
> For more details, this should be directed to the dev mailing list.
>
> – Ufuk
>
> On Tue, Dec 26, 2017 at 4:58 AM, aitozi <gj...@gmail.com> wrote:
> > Hi,
> >
> > i cant unserstand usage of this snippest of the code in
> > MergingWindowSet.java, can anyone explain this for me ?
> >
> >
> > if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() ==
> 1)) {
> >                                 mergeFunction.merge(mergeResult,
> >                                                 mergedWindows,
> >
>  this.mapping.get(mergeResult),
> >                                                 mergedStateWindows);
> > }
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: MergingWindow

Posted by Ufuk Celebi <uc...@apache.org>.
Please check your email before sending it the next time as three
emails for the same message is a little spammy ;-)

This is internal code that is used to implement session windows as far
as I can tell. The idea is to not merge the new window as it never had
any state associated with it. The general idea of merging windows is
to keep one of the original windows as the state window, i.e. the
window that is used as namespace to store the window elements.
Elements from the state windows of merged windows must be merged into
this one state window.

For more details, this should be directed to the dev mailing list.

– Ufuk

On Tue, Dec 26, 2017 at 4:58 AM, aitozi <gj...@gmail.com> wrote:
> Hi,
>
> i cant unserstand usage of this snippest of the code in
> MergingWindowSet.java, can anyone explain this for me ?
>
>
> if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
>                                 mergeFunction.merge(mergeResult,
>                                                 mergedWindows,
>                                                 this.mapping.get(mergeResult),
>                                                 mergedStateWindows);
> }
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/