You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Annemarie Burger <an...@campus.tu-berlin.de> on 2020/05/06 20:13:45 UTC

Window processing in Stateful Functions

Hi,

I want to do windowed processing in each function when using Stateful
Functions. Is this possible? Some pseudo code would be very helpful!

Some more context: I'm having a stream of edges as input. I want to window
these and save the graph representation (either as edge list, adjacency
list, or CSR) in a distributed way using state. Since doing this for the
entire edge stream would cost far too much memory, I want to only save the
state of the graph within the window. How could I achieve this?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Window processing in Stateful Functions

Posted by Igal Shilman <ig...@ververica.com>.
Hi,
One way to keep the state size under control would be:
1) attach for every incoming edge it's "insertion time" into the vertex
function's state.
2) in addition, the vertex function would send a delayed message, with a
delay of insertion time + expiration duration
3) once a delayed message arrives, iterate over your edge state and remove
all the edges with "insertion time" <= now()

To reduce the number of delayed messages, you can make sure to send a
single delayed message once a fixed expiration interval
(a.k.a. tumbling window).

A better way to deal with that would be to wait until [1] would be
implemented in StateFun (I don't believe it should take more than couple of
weeks)
Then you can simply define your state with expiration and StateFun would
make sure that the edge state would be purged automatically some configured
time
after insertion.

I hope this helps,
Good luck!
Igal.


[1] https://issues.apache.org/jira/browse/FLINK-17644

On Fri, May 8, 2020 at 1:00 PM m@xi <ma...@gmail.com> wrote:

> Dear Igal, Very insightful answer. Thanks.
>
> Igal Shilman wrote
> An alternative approach would be to implement a *thumbling window* per
> vertex (a stateful function instance) by sending to itself a delayed
> message [2]. When that specific delayed message arrives you would have to
> purge the oldest edges by examining the edges in state.
>
> Indeed, the delayed asynchronous messages are a workaround for *tumbling
> window* simulation in SF. I believe you assume a message received by a
> stateful function contains multiple edges, i.e. which can all be delayed by
> a certain amount of time. Therefore, when a function receives a message, it
> purges all of its existing edges and incorporates the new (delayed) ones.
> Correct? Nevertheless, if you think of it, the delay is essentially the *window
> slide*. Now, what about the *window range*?
>
> Igal Shilman wrote
> Data stream windows are not yet supported in statefun, but it seems like
> the main motivation here is to purge old edges? If this is the case perhaps
> we need to integrate state TTL [1] into persisted values/persistedtables.
>
> I was not aware about the TTL, very interesting and handful. Essentially,
> the TTL can enforce the *window range* i.e., attach to each tuple
> received by a stateful function its lifespan/duration. So, the first TTL
> attribute sets the range *StateTtlConfig.newBuilder(Time.seconds(window
> range))*. Therefore, by *combining TTL and SF Delayed Messaging* we can *simulate
> sliding window* processing on a stateful function basis. However, TTL is
> a Flink constuct and I am not sure if I got it correctly. You said
>
> Igal Shilman wrote
> If this is the case perhaps *we need to integrate* state TTL [1] into
> persisted values/persistedtables.
>
> If this is the case, then I believe it would be great to integrate TLL
> into Persisted Values/Tables
> <https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/sdk/java.html#persistence>.
>
> ------------------------------
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Window processing in Stateful Functions

Posted by "m@xi" <ma...@gmail.com>.
Dear Igal,Very insightful answer. Thanks.
Igal Shilman wrote
> An alternative approach would be to implement a 
*
> thumbling window
*
>  per vertex(a stateful function instance)by sending to itself a delayed
> message [2]. When that specific delayedmessage arrives you wouldhave to
> purge the oldest edges by examining the edges in state.

Indeed, the delayed asynchronous messages are a workaround for *tumbling
window* simulation in SF. I believe you assume a message received by a
stateful function contains multiple edges, i.e. which can all be delayed by
a certain amount of time. Therefore, when a function receives a message, it
purges all of its existing edges and incorporates the new (delayed)
ones.Correct?Nevertheless, if you think of it, the delay is essentially the
*window slide*. Now, what about the *window range*? 
Igal Shilman wrote
> Data stream windows are not yet supported in statefun, but it seems
> likethe main motivation hereis to purge old edges?If this is the case
> perhaps we need to integrate state TTL [1] intopersisted
> values/persistedtables.

I was not aware about the TTL, very interesting and handful. Essentially,
the TTL can enforce the *window range* i.e., attach to each tuple received
by a stateful function its lifespan/duration. So, the first TTL attribute
sets the range /StateTtlConfig.newBuilder(Time.seconds(*window
range*))/.Therefore, by *combining TTL and SF Delayed Messaging* we can
*simulate sliding window* processing on a stateful function basis.However,
TTL is a Flink constuct and I am not sure if I got it correctly. You said
Igal Shilman wrote
> If this is the case perhaps 
*
> we need to integrate
*
>  state TTL [1] intopersisted values/persistedtables.

If this is the case, then I believe it would be great to integrate TLL into 
Persisted Values/Tables
<https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/sdk/java.html#persistence> 
.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Window processing in Stateful Functions

Posted by Igal Shilman <ig...@ververica.com>.
Hi all,

Data stream windows are not yet supported in statefun, but it seems like
the main motivation here
is to purge old edges?
If this is the case perhaps we need to integrate state TTL [1] into
persisted values/persistedtables.

An alternative approach would be to implement a thumbling window per vertex
(a stateful function instance)
by sending to itself a delayed message [2]. When that specific delayed
message arrives you would
have to purge the oldest edges by examining the edges in state.

I hope it helps,
Igal.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/sdk/java.html#sending-delayed-messages



On Wednesday, May 6, 2020, Oytun Tez <oy...@motaword.com> wrote:

> Oops – will follow the thread 🙊
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oytun@motaword.com
>
>       <https://www.motaword.com/blog>
>
>
> On Wed, May 6, 2020 at 5:37 PM m@xi <ma...@gmail.com> wrote:
>
>> Hello Tez,
>>
>> With all the respect, I doubt your answer is related the question.
>>
>> *Just to re-phase a bit*: Assuming we use SF for our application, how can
>> we
>> apply window logic when a function does some processing? *Is there a
>> proper
>> way?*
>>
>> @*Igal*: we would very much appreciate your answer. :)
>>
>> Best,
>> Max
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Window processing in Stateful Functions

Posted by Oytun Tez <oy...@motaword.com>.
Oops – will follow the thread 🙊


 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oytun@motaword.com

      <https://www.motaword.com/blog>


On Wed, May 6, 2020 at 5:37 PM m@xi <ma...@gmail.com> wrote:

> Hello Tez,
>
> With all the respect, I doubt your answer is related the question.
>
> *Just to re-phase a bit*: Assuming we use SF for our application, how can
> we
> apply window logic when a function does some processing? *Is there a proper
> way?*
>
> @*Igal*: we would very much appreciate your answer. :)
>
> Best,
> Max
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Window processing in Stateful Functions

Posted by "m@xi" <ma...@gmail.com>.
Hello Tez,

With all the respect, I doubt your answer is related the question.

*Just to re-phase a bit*: Assuming we use SF for our application, how can we
apply window logic when a function does some processing? *Is there a proper
way?*

@*Igal*: we would very much appreciate your answer. :)

Best,
Max





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Window processing in Stateful Functions

Posted by Oytun Tez <oy...@motaword.com>.
I think this is also related to my question about CEP in Statefun.

@Annemarie Burger <an...@campus.tu-berlin.de> , I was looking
into using Siddhi library within the Function's context.



 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oytun@motaword.com

      <https://www.motaword.com/blog>


On Wed, May 6, 2020 at 4:13 PM Annemarie Burger <
annemarie.burger@campus.tu-berlin.de> wrote:

> Hi,
>
> I want to do windowed processing in each function when using Stateful
> Functions. Is this possible? Some pseudo code would be very helpful!
>
> Some more context: I'm having a stream of edges as input. I want to window
> these and save the graph representation (either as edge list, adjacency
> list, or CSR) in a distributed way using state. Since doing this for the
> entire edge stream would cost far too much memory, I want to only save the
> state of the graph within the window. How could I achieve this?
>
> Thanks!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>