You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2021/03/12 04:09:50 UTC

clear() in a ProcessWindowFunction

Hello folks,
                  The suggestion is to use windowState() for a key key per
window state and clear the state explicitly.  Also it seems that
getRuntime().getState() will return a globalWindow() where state is shared
among windows with the same key. I desire of course to have state scoped to
a key per window and was wanting to use windowState().. The caveat is that
my window is a Session Window and when I try to use clear()  I am
thrown this exception  ( Session Windows are Merging Windows )

Caused by: java.lang.UnsupportedOperationException: Per-window state is not
allowed when using merging windows.


The questions are

* How do I have state per *session* window/ per key and still be able to
clear it ?
* Does getRuntime().getState() give me the clear() semantics for free along
with state per window per key and thus I  have
understood  getRuntime().getState() wrong ?

Regards.

Re: clear() in a ProcessWindowFunction

Posted by Vishal Santoshi <vi...@gmail.com>.
Thank you for the confirmation. The simulations confirm too.

On Fri, Apr 9, 2021 at 11:14 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Vishal,
>
> Sorry for the late reply,
> Please find my answers below.
> By state I assume the state obtained via getRuntimeContext (access to
> window state is not allowed)..
>
> > The state is scoped to the key (created per key in the
> ProcessWindowFunction with a ttl )
> Yes.
>
> > The state will remain alive irrespective of whether the Window is closed
> or not (a TTL timer does the collection )
> Right, but you need to configure TTL when accessing the state [1]
>
> >  The execution on a key is sequential , as in if 2 events arrive for the
> 2 Sessions they happen sequentially ( or in any order but without the need
> of synchronization )
> Right.
>
> > The state mutated by an event in Session A, will be visible to Session B
> if an event incident on Session B was to happen subsequently.  There is no
> need of synchronizing access to the state as it for the same key.
> Right.
>
> Your understanding of merging of window contents is also correct.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> Regards,
> Roman
>
>
> On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
> <vi...@gmail.com> wrote:
> >
> > I had a query Say I have a single key with 2 live sessions ( A and B )
> with a configured lateness .
> >
> > Do these invariants hold?
> >
> > * The state is scoped to the key (created per key in the
> ProcessWindowFunction with a ttl )
> > * The state will remain alive irrespective of whether the Window is
> closed or not (a TTL timer does the collection )
> > *  The execution on a key is sequential , as in if 2 events arrive for
> the 2 Sessions they happen sequentially ( or in any order but without the
> need of synchronization )
> > * The state mutated by an event in Session A, will be visible to Session
> B if an event incident on Session B was to happen subsequently.  There is
> no need of synchronizing access to the state as it for the same key.
> >
> > What I am not sure about is what happens when session A merge with
> session B. I would assume that it just is defining new start and end of the
> merged window, Gcing the old ones ( or at least one of them ) and assigning
> that even to that new window. What one does with the custom state in
> ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
> done in the process method above,  As in this state is 1 degree removed
> from what ever flink does internally with it's merges given that the state
> is scoped to the key.
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
> >>
> >> Yep, makes sense.
> >>
> >> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>>
> >>> > Want to confirm that the keys are GCed ( along with state ) once
> the  (windows close + lateness ) ?
> >>> Window state is cleared (as well as the window itself), but global
> >>> state is not (unless you use TTL).
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
> >>> <vi...@gmail.com> wrote:
> >>> >
> >>> > Sometimes writing it down makes you think. I now realize that this
> is not the right approach, given that merging windows will have their own
> states..and how the merge happens is really at the key level....
> >>> >
> >>> >
> >>> >
> >>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
> >>> >>
> >>> >> I intend to augment every event in a session  with a unique ID.  To
> keep the session lean, there is a PurgingTrigger on this aggregate that
> fires on a count of 1.
> >>> >>
> >>> >> >> (except that the number of keys can grow).
> >>> >>
> >>> >> Want to confirm that the keys are GCed ( along with state ) once
> the  (windows close + lateness ) ?
> >>> >>
> >>> >>
> >>> >>
> >>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>> >>>
> >>> >>> Hi Vishal,
> >>> >>>
> >>> >>> There is no leak in the code you provided (except that the number
> of
> >>> >>> keys can grow).
> >>> >>> But as you figured out the state is scoped to key, not to
> window+key.
> >>> >>>
> >>> >>> Could you explain what you are trying to achieve and why do you
> need to combine
> >>> >>> sliding windows with state scoped to window+key?
> >>> >>>
> >>> >>> Regards,
> >>> >>> Roman
> >>> >>>
> >>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
> >>> >>> <vi...@gmail.com> wrote:
> >>> >>> >
> >>> >>> > Essentially, Does this code leak state
> >>> >>> >
> >>> >>> > private static class SessionIdProcessWindowFunction<KEY extends
> java.io.Serializable, VALUE extends java.io.Serializable>
> >>> >>> > extends
> >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> >>> >>> > private static final long serialVersionUID = 1L;
> >>> >>> > private final static ValueStateDescriptor<String> sessionId =
> new ValueStateDescriptor<String>("session_uid",
> >>> >>> > String.class);
> >>> >>> >
> >>> >>> > @Override
> >>> >>> > public void process(KEY key,
> >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> >>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements,
> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> >>> >>> > throws Exception {
> >>> >>> > // I need this scoped to key/window
> >>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
> >>> >>> > UUID uuid = UUID.randomUUID();
> >>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> >>> >>> > }
> >>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
> >>> >>> > out.collect(new
> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> >>> >>> > }
> >>> >>> > }
> >>> >>> >
> >>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
> >>> >>> >>
> >>> >>> >> Hello folks,
> >>> >>> >>                   The suggestion is to use windowState() for a
> key key per window state and clear the state explicitly.  Also it seems
> that getRuntime().getState() will return a globalWindow() where state is
> shared among windows with the same key. I desire of course to have state
> scoped to a key per window and was wanting to use windowState().. The
> caveat is that my window is a Session Window and when I try to use clear()
> I am thrown this exception  ( Session Windows are Merging Windows )
> >>> >>> >>
> >>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
> state is not allowed when using merging windows.
> >>> >>> >>
> >>> >>> >>
> >>> >>> >> The questions are
> >>> >>> >>
> >>> >>> >> * How do I have state per session window/ per key and still be
> able to clear it ?
> >>> >>> >> * Does getRuntime().getState() give me the clear() semantics
> for free along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>> >>> >>
> >>> >>> >> Regards.
> >>> >>> >>
> >>> >>> >>
> >>> >>> >>
>

Re: clear() in a ProcessWindowFunction

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Vishal,

Sorry for the late reply,
Please find my answers below.
By state I assume the state obtained via getRuntimeContext (access to
window state is not allowed)..

> The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl )
Yes.

> The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection )
Right, but you need to configure TTL when accessing the state [1]

>  The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization )
Right.

> The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently.  There is no need of synchronizing access to the state as it for the same key.
Right.

Your understanding of merging of window contents is also correct.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman


On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
<vi...@gmail.com> wrote:
>
> I had a query Say I have a single key with 2 live sessions ( A and B )  with a configured lateness .
>
> Do these invariants hold?
>
> * The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl )
> * The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection )
> *  The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization )
> * The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently.  There is no need of synchronizing access to the state as it for the same key.
>
> What I am not sure about is what happens when session A merge with session B. I would assume that it just is defining new start and end of the merged window, Gcing the old ones ( or at least one of them ) and assigning that even to that new window. What one does with the custom state in ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is done in the process method above,  As in this state is 1 degree removed from what ever flink does internally with it's merges given that the state is scoped to the key.
>
>
>
>
>
>
>
> On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <vi...@gmail.com> wrote:
>>
>> Yep, makes sense.
>>
>> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org> wrote:
>>>
>>> > Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>> Window state is cleared (as well as the window itself), but global
>>> state is not (unless you use TTL).
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>> <vi...@gmail.com> wrote:
>>> >
>>> > Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level....
>>> >
>>> >
>>> >
>>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <vi...@gmail.com> wrote:
>>> >>
>>> >> I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.
>>> >>
>>> >> >> (except that the number of keys can grow).
>>> >>
>>> >> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> wrote:
>>> >>>
>>> >>> Hi Vishal,
>>> >>>
>>> >>> There is no leak in the code you provided (except that the number of
>>> >>> keys can grow).
>>> >>> But as you figured out the state is scoped to key, not to window+key.
>>> >>>
>>> >>> Could you explain what you are trying to achieve and why do you need to combine
>>> >>> sliding windows with state scoped to window+key?
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> >>> <vi...@gmail.com> wrote:
>>> >>> >
>>> >>> > Essentially, Does this code leak state
>>> >>> >
>>> >>> > private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
>>> >>> > extends
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> >>> > private static final long serialVersionUID = 1L;
>>> >>> > private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
>>> >>> > String.class);
>>> >>> >
>>> >>> > @Override
>>> >>> > public void process(KEY key,
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> >>> > throws Exception {
>>> >>> > // I need this scoped to key/window
>>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> >>> > UUID uuid = UUID.randomUUID();
>>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> >>> > }
>>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> >>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> >>> > }
>>> >>> > }
>>> >>> >
>>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <vi...@gmail.com> wrote:
>>> >>> >>
>>> >>> >> Hello folks,
>>> >>> >>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>> >>> >>
>>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>> >>> >>
>>> >>> >>
>>> >>> >> The questions are
>>> >>> >>
>>> >>> >> * How do I have state per session window/ per key and still be able to clear it ?
>>> >>> >> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>> >>> >>
>>> >>> >> Regards.
>>> >>> >>
>>> >>> >>
>>> >>> >>

Re: clear() in a ProcessWindowFunction

Posted by Vishal Santoshi <vi...@gmail.com>.
I had a query Say I have a single key with 2 live sessions ( A and B )
with a configured lateness .

Do these invariants hold?

* The state is scoped to the key (created per key in the
ProcessWindowFunction with a ttl )
* The state will remain alive irrespective of whether the Window is closed
or not (a TTL timer does the collection )
*  The execution on a key is sequential , as in if 2 events arrive for the
2 Sessions they happen sequentially ( or in any order but without the need
of synchronization )
* The state mutated by an event in Session A, will be visible to Session B
if an event incident on Session B was to happen subsequently.  There is no
need of synchronizing access to the state as it for the same key.

What I am not sure about is what happens when session A merge with session
B. I would assume that it just is defining new start and end of the merged
window, Gcing the old ones ( or at least one of them ) and assigning that
even to that new window. What one does with the custom state in
ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
done in the process method above,  As in this state is 1 degree removed
from what ever flink does internally with it's merges given that the state
is scoped to the key.







On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Yep, makes sense.
>
> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> > Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> Window state is cleared (as well as the window itself), but global
>> state is not (unless you use TTL).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>> <vi...@gmail.com> wrote:
>> >
>> > Sometimes writing it down makes you think. I now realize that this is
>> not the right approach, given that merging windows will have their own
>> states..and how the merge happens is really at the key level....
>> >
>> >
>> >
>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>> >>
>> >> I intend to augment every event in a session  with a unique ID.  To
>> keep the session lean, there is a PurgingTrigger on this aggregate that
>> fires on a count of 1.
>> >>
>> >> >> (except that the number of keys can grow).
>> >>
>> >> Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> >>
>> >>
>> >>
>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org>
>> wrote:
>> >>>
>> >>> Hi Vishal,
>> >>>
>> >>> There is no leak in the code you provided (except that the number of
>> >>> keys can grow).
>> >>> But as you figured out the state is scoped to key, not to window+key.
>> >>>
>> >>> Could you explain what you are trying to achieve and why do you need
>> to combine
>> >>> sliding windows with state scoped to window+key?
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>> >>> <vi...@gmail.com> wrote:
>> >>> >
>> >>> > Essentially, Does this code leak state
>> >>> >
>> >>> > private static class SessionIdProcessWindowFunction<KEY extends
>> java.io.Serializable, VALUE extends java.io.Serializable>
>> >>> > extends
>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
>> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>> >>> > private static final long serialVersionUID = 1L;
>> >>> > private final static ValueStateDescriptor<String> sessionId = new
>> ValueStateDescriptor<String>("session_uid",
>> >>> > String.class);
>> >>> >
>> >>> > @Override
>> >>> > public void process(KEY key,
>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
>> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements,
>> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>> >>> > throws Exception {
>> >>> > // I need this scoped to key/window
>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> >>> > UUID uuid = UUID.randomUUID();
>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> >>> > }
>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>> >>> > out.collect(new
>> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>> >>> > }
>> >>> > }
>> >>> >
>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>> >>> >>
>> >>> >> Hello folks,
>> >>> >>                   The suggestion is to use windowState() for a key
>> key per window state and clear the state explicitly.  Also it seems that
>> getRuntime().getState() will return a globalWindow() where state is shared
>> among windows with the same key. I desire of course to have state scoped to
>> a key per window and was wanting to use windowState().. The caveat is that
>> my window is a Session Window and when I try to use clear()  I am thrown
>> this exception  ( Session Windows are Merging Windows )
>> >>> >>
>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
>> state is not allowed when using merging windows.
>> >>> >>
>> >>> >>
>> >>> >> The questions are
>> >>> >>
>> >>> >> * How do I have state per session window/ per key and still be
>> able to clear it ?
>> >>> >> * Does getRuntime().getState() give me the clear() semantics for
>> free along with state per window per key and thus I  have understood
>> getRuntime().getState() wrong ?
>> >>> >>
>> >>> >> Regards.
>> >>> >>
>> >>> >>
>> >>> >>
>>
>

Re: clear() in a ProcessWindowFunction

Posted by Vishal Santoshi <vi...@gmail.com>.
Yep, makes sense.

On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org> wrote:

> > Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
> Window state is cleared (as well as the window itself), but global
> state is not (unless you use TTL).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> Regards,
> Roman
>
> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
> <vi...@gmail.com> wrote:
> >
> > Sometimes writing it down makes you think. I now realize that this is
> not the right approach, given that merging windows will have their own
> states..and how the merge happens is really at the key level....
> >
> >
> >
> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
> >>
> >> I intend to augment every event in a session  with a unique ID.  To
> keep the session lean, there is a PurgingTrigger on this aggregate that
> fires on a count of 1.
> >>
> >> >> (except that the number of keys can grow).
> >>
> >> Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
> >>
> >>
> >>
> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>>
> >>> Hi Vishal,
> >>>
> >>> There is no leak in the code you provided (except that the number of
> >>> keys can grow).
> >>> But as you figured out the state is scoped to key, not to window+key.
> >>>
> >>> Could you explain what you are trying to achieve and why do you need
> to combine
> >>> sliding windows with state scoped to window+key?
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
> >>> <vi...@gmail.com> wrote:
> >>> >
> >>> > Essentially, Does this code leak state
> >>> >
> >>> > private static class SessionIdProcessWindowFunction<KEY extends
> java.io.Serializable, VALUE extends java.io.Serializable>
> >>> > extends
> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> >>> > private static final long serialVersionUID = 1L;
> >>> > private final static ValueStateDescriptor<String> sessionId = new
> ValueStateDescriptor<String>("session_uid",
> >>> > String.class);
> >>> >
> >>> > @Override
> >>> > public void process(KEY key,
> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> >>> > Iterable<KeyedSession<KEY, VALUE>> elements,
> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> >>> > throws Exception {
> >>> > // I need this scoped to key/window
> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
> >>> > UUID uuid = UUID.randomUUID();
> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> >>> > }
> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
> >>> > out.collect(new
> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> >>> > }
> >>> > }
> >>> >
> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
> >>> >>
> >>> >> Hello folks,
> >>> >>                   The suggestion is to use windowState() for a key
> key per window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am thrown
> this exception  ( Session Windows are Merging Windows )
> >>> >>
> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
> state is not allowed when using merging windows.
> >>> >>
> >>> >>
> >>> >> The questions are
> >>> >>
> >>> >> * How do I have state per session window/ per key and still be able
> to clear it ?
> >>> >> * Does getRuntime().getState() give me the clear() semantics for
> free along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>> >>
> >>> >> Regards.
> >>> >>
> >>> >>
> >>> >>
>

Re: clear() in a ProcessWindowFunction

Posted by Roman Khachatryan <ro...@apache.org>.
> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
Window state is cleared (as well as the window itself), but global
state is not (unless you use TTL).

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman

On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
<vi...@gmail.com> wrote:
>
> Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level....
>
>
>
> On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <vi...@gmail.com> wrote:
>>
>> I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.
>>
>> >> (except that the number of keys can grow).
>>
>> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>
>>
>>
>> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> wrote:
>>>
>>> Hi Vishal,
>>>
>>> There is no leak in the code you provided (except that the number of
>>> keys can grow).
>>> But as you figured out the state is scoped to key, not to window+key.
>>>
>>> Could you explain what you are trying to achieve and why do you need to combine
>>> sliding windows with state scoped to window+key?
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> <vi...@gmail.com> wrote:
>>> >
>>> > Essentially, Does this code leak state
>>> >
>>> > private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
>>> > extends
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> > private static final long serialVersionUID = 1L;
>>> > private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
>>> > String.class);
>>> >
>>> > @Override
>>> > public void process(KEY key,
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>>> > Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> > throws Exception {
>>> > // I need this scoped to key/window
>>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> > UUID uuid = UUID.randomUUID();
>>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> > }
>>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> > }
>>> > }
>>> >
>>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <vi...@gmail.com> wrote:
>>> >>
>>> >> Hello folks,
>>> >>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>> >>
>>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>> >>
>>> >>
>>> >> The questions are
>>> >>
>>> >> * How do I have state per session window/ per key and still be able to clear it ?
>>> >> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>> >>
>>> >> Regards.
>>> >>
>>> >>
>>> >>

Re: clear() in a ProcessWindowFunction

Posted by Vishal Santoshi <vi...@gmail.com>.
Sometimes writing it down makes you think. I now realize that this is not
the right approach, given that merging windows will have their own
states..and how the merge happens is really at the key level....



On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <vi...@gmail.com>
wrote:

> I intend to augment every event in a session  with a unique ID.  To keep
> the session lean, there is a PurgingTrigger on this aggregate that  fires
> on a count of 1.
>
> >> (except that the number of keys can grow).
>
> Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
>
>
>
> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> Hi Vishal,
>>
>> There is no leak in the code you provided (except that the number of
>> keys can grow).
>> But as you figured out the state is scoped to key, not to window+key.
>>
>> Could you explain what you are trying to achieve and why do you need to
>> combine
>> sliding windows with state scoped to window+key?
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>> <vi...@gmail.com> wrote:
>> >
>> > Essentially, Does this code leak state
>> >
>> > private static class SessionIdProcessWindowFunction<KEY extends
>> java.io.Serializable, VALUE extends java.io.Serializable>
>> > extends
>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
>> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>> > private static final long serialVersionUID = 1L;
>> > private final static ValueStateDescriptor<String> sessionId = new
>> ValueStateDescriptor<String>("session_uid",
>> > String.class);
>> >
>> > @Override
>> > public void process(KEY key,
>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
>> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>> > Iterable<KeyedSession<KEY, VALUE>> elements,
>> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>> > throws Exception {
>> > // I need this scoped to key/window
>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> > UUID uuid = UUID.randomUUID();
>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> > }
>> > String uuid = getRuntimeContext().getState(sessionId).value();
>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(),
>> uuid));
>> > }
>> > }
>> >
>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>> >>
>> >> Hello folks,
>> >>                   The suggestion is to use windowState() for a key key
>> per window state and clear the state explicitly.  Also it seems that
>> getRuntime().getState() will return a globalWindow() where state is shared
>> among windows with the same key. I desire of course to have state scoped to
>> a key per window and was wanting to use windowState().. The caveat is that
>> my window is a Session Window and when I try to use clear()  I am thrown
>> this exception  ( Session Windows are Merging Windows )
>> >>
>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state
>> is not allowed when using merging windows.
>> >>
>> >>
>> >> The questions are
>> >>
>> >> * How do I have state per session window/ per key and still be able to
>> clear it ?
>> >> * Does getRuntime().getState() give me the clear() semantics for free
>> along with state per window per key and thus I  have understood
>> getRuntime().getState() wrong ?
>> >>
>> >> Regards.
>> >>
>> >>
>> >>
>>
>

Re: clear() in a ProcessWindowFunction

Posted by Vishal Santoshi <vi...@gmail.com>.
I intend to augment every event in a session  with a unique ID.  To keep
the session lean, there is a PurgingTrigger on this aggregate that  fires
on a count of 1.

>> (except that the number of keys can grow).

Want to confirm that the keys are GCed ( along with state ) once the
(windows close + lateness ) ?



On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Vishal,
>
> There is no leak in the code you provided (except that the number of
> keys can grow).
> But as you figured out the state is scoped to key, not to window+key.
>
> Could you explain what you are trying to achieve and why do you need to
> combine
> sliding windows with state scoped to window+key?
>
> Regards,
> Roman
>
> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
> <vi...@gmail.com> wrote:
> >
> > Essentially, Does this code leak state
> >
> > private static class SessionIdProcessWindowFunction<KEY extends
> java.io.Serializable, VALUE extends java.io.Serializable>
> > extends
> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> > private static final long serialVersionUID = 1L;
> > private final static ValueStateDescriptor<String> sessionId = new
> ValueStateDescriptor<String>("session_uid",
> > String.class);
> >
> > @Override
> > public void process(KEY key,
> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> > Iterable<KeyedSession<KEY, VALUE>> elements,
> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> > throws Exception {
> > // I need this scoped to key/window
> > if (getRuntimeContext().getState(sessionId).value() == null) {
> > UUID uuid = UUID.randomUUID();
> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> > }
> > String uuid = getRuntimeContext().getState(sessionId).value();
> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(),
> uuid));
> > }
> > }
> >
> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
> >>
> >> Hello folks,
> >>                   The suggestion is to use windowState() for a key key
> per window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am thrown
> this exception  ( Session Windows are Merging Windows )
> >>
> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is
> not allowed when using merging windows.
> >>
> >>
> >> The questions are
> >>
> >> * How do I have state per session window/ per key and still be able to
> clear it ?
> >> * Does getRuntime().getState() give me the clear() semantics for free
> along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>
> >> Regards.
> >>
> >>
> >>
>

Re: clear() in a ProcessWindowFunction

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Vishal,

There is no leak in the code you provided (except that the number of
keys can grow).
But as you figured out the state is scoped to key, not to window+key.

Could you explain what you are trying to achieve and why do you need to combine
sliding windows with state scoped to window+key?

Regards,
Roman

On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
<vi...@gmail.com> wrote:
>
> Essentially, Does this code leak state
>
> private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
> extends
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
> String.class);
>
> @Override
> public void process(KEY key,
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> throws Exception {
> // I need this scoped to key/window
> if (getRuntimeContext().getState(sessionId).value() == null) {
> UUID uuid = UUID.randomUUID();
> getRuntimeContext().getState(sessionId).update(uuid.toString());
> }
> String uuid = getRuntimeContext().getState(sessionId).value();
> out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> }
> }
>
> On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <vi...@gmail.com> wrote:
>>
>> Hello folks,
>>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>
>> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>
>>
>> The questions are
>>
>> * How do I have state per session window/ per key and still be able to clear it ?
>> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>
>> Regards.
>>
>>
>>

Re: clear() in a ProcessWindowFunction

Posted by Vishal Santoshi <vi...@gmail.com>.
Essentially, Does this code leak state

private static class SessionIdProcessWindowFunction<KEY extends java.io.
Serializable, VALUE extends java.io.Serializable>
extends
ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow> {
private static final long serialVersionUID = 1L;
private final static ValueStateDescriptor<String> sessionId = new
ValueStateDescriptor<String>("session_uid",
String.class);

@Override
public void process(KEY key,
ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow>.Context context,
Iterable<KeyedSession<KEY, VALUE>> elements, Collector<
KeyedSessionWithSessionID<KEY, VALUE>> out)
throws Exception {
// *I need this scoped to key/window*
if (getRuntimeContext().getState(sessionId).value() == null) {
UUID uuid = UUID.randomUUID();
getRuntimeContext().getState(sessionId).update(uuid.toString());
}
String uuid = getRuntimeContext().getState(sessionId).value();
out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid
));
}
}

On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Hello folks,
>                   The suggestion is to use windowState() for a key key per
> window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am
> thrown this exception  ( Session Windows are Merging Windows )
>
> Caused by: java.lang.UnsupportedOperationException: Per-window state is
> not allowed when using merging windows.
>
>
> The questions are
>
> * How do I have state per *session* window/ per key and still be able to
> clear it ?
> * Does getRuntime().getState() give me the clear() semantics for free
> along with state per window per key and thus I  have
> understood  getRuntime().getState() wrong ?
>
> Regards.
>
>
>
>