You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Krzysztof Zarzycki <k....@gmail.com> on 2020/03/30 22:59:47 UTC

Complex graph-based sessionization (potential use for stateful functions)

Hi!  Interesting problem to solve ahead :)
I need to implement a streaming sessionization algorithm (split stream of
events into groups of correlated events). It's pretty non-standard as we
DON'T have a key like user id which separates the stream into substreams
which we just need to chunk based on time.
Instead and simplifying a lot, our events bear tuples, that I compare to
graph edges, e.g.:
event 1: A -> B
event 2: B -> C
event 3: D -> E
event 4: D -> F
event 5: G -> F
I need to group them into subgroups reachable by following these edges from
some specific nodes. E.g. here:
{ A->B, B->C}
{ D->E, D->F}
{ G->F }
(note: I need to group the events, which are represented by edges here, not
the nodes).
As far as I understand, to solve this problem I need to leverage feedback
loops/iterations feature in Flink (Generally I believe I need to apply a
Bulk Synchronous Processing approach).

Does anyone have seen this kind of sessionization implemented in the wild?
Would you suggest implementing such an algorithm using *stateful functions*?
(AFAIK, they use feedback loops underneath). Can you suggest how would
these be used here?
I know there are some problems with checkpointing when using iterations,
does it mean the implementation may experience data loss on stops?

Side comment: I'm not sure which graph algorithm derivative needs to be
applied here, but the candidate is transitive closure.

Thanks for joining the discussion!
Krzysztof

Re: Complex graph-based sessionization (potential use for stateful functions)

Posted by "m@xi" <ma...@gmail.com>.
Hi Igal,

Thanks a lot for your answer. I believe you are one of the core developers
behind the interesting statefun.

Your suggestion is really nice and as you say, one way is to tailor the
graph processing to the philosophy of SF.

Though, if one vertex is a stateful function, then heavy hitter nodes in
massive graph will become a bottleneck since their functions will do way
more computation than other nodes, which have only a few edges.

Nevertheless, can the statefun messages be everything? For instance, "its
adjacency list" parts of the a vertex/function could be sent as well??

Another way is SF and Flink to way together. Can SF be responsible solely
for the messaging of state in a Flink job??

Thanks you in advance.

Best,
Makis





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Complex graph-based sessionization (potential use for stateful functions)

Posted by Igal Shilman <ig...@ververica.com>.
Hi All,
One way to try to think about it with StateFun, is to represent the Graph
vertices as stateful functions instances. Unlike other frameworks an
instance of a function does not take any resources while idle, and
potentially you can have many millions of those.
A state for each vertex might be a list of adjacent vertices, and
potentially a timer so that they won’t linger for to long.
You would still have to think of what kind of graph algorithm to apply here.

I hope it helps,
Igal.

On Thursday, April 9, 2020, Robert Metzger <rm...@apache.org> wrote:

> Hey Max,
>
> 1) Stateful functions has been released now: https://mvnrepository.
> com/artifact/org.apache.flink/statefun-flink-core
> See also: https://flink.apache.org/news/2020/04/07/release-
> statefun-2.0.0.html
> Getting Started: https://ci.apache.org/projects/flink/flink-
> statefun-docs-release-2.0/getting-started/java_walkthrough.html
>
> Please give us honest feedback on the "onboarding experience" with
> stateful functions. We are very eager to make the experience as smooth as
> possible :)
>
> 2) What do you consider large state? The Flink runtime itself can handle
> large events / messages (1GB should work). I'm not sure about statefun, but
> I will try to get an answer for you :)
>
> Best,
> Robert
>
>
> On Tue, Apr 7, 2020 at 9:31 AM m@xi <ma...@gmail.com> wrote:
>
>> Hello Robert
>>
>> Thanks to your reply I discovered the Stateful Functions which I believe
>> is
>> a quite powerful tool. I have some questions:
>>
>> 1) As you said, "the community is currently in the process of releasing
>> the
>> first Apache release of StateFun and it should hopefully be out by the end
>> of this week". Does this mean that it will become available in Maven
>> Repository?
>>
>> Because I can't find it while searching in
>> https://mvnrepository.com/artifact/org.apache.flink?sort=newest
>> or
>> use the API in my intellij project when I import the dependencies in my
>> POM
>> file.
>>
>> I though of dowloading the code from
>> https://ci.apache.org/projects/flink/flink-statefun-docs-master/,
>> compiling
>> it with *mvn clean package* and then import the produced jar file to my
>> intellij project as an External Library. Is this what you might recommend
>> for now?
>>
>> 2) I came across this tutorial by Stefan on stateful functions
>> https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that
>> arbitrary
>> communication between nodes/functions/actors is essentially made possible
>> by
>> introducing feedback loops to the DAG Flink topology (well now it has
>> circles I guess :P) to simulate the arbitrary messasing defined in the
>> StateFun topology.
>>
>> So the message passing is done by "feedback and tuple rerouting" and not
>> with MPI. Do you think (or have tested) if one may *efficiently*
>> send/receive (potentially large) state, like graph state which is the use
>> case of this post?  Or it is more suitable for sending control messages
>> between actors/functions?
>>
>> Thanks a lot in advance.
>>
>> Best,
>> Max
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

Posted by "m@xi" <ma...@gmail.com>.
Hi Robert,

Thanks a lot for your reply.

1) Now statefun packages are in the MVN repository, so probably they needed
some time to really be included there after your official release.

2) Alinged to the topic of the thread, I am referring to state of massive
graph streams. 
    To enable distributed graph processing first one needs to partition (in
a specific or random way) the incoming edges to 
    different processing units which subsequently should:
        (a) maintain (initialize and update) the part of the graph's state
they are responsible for as the graph stream evolves 
        (b) perform computation for answering the distributed graph
processing task.

The (a) part can be done easily with Flink, but (b) probably dictates
accessing partial graph state of one machine from another. 

Based on my experience, to do it in Flink one has to implement the
communication among processing units in such a way that is specific to the
graph processing task. So, it would be great to decouple (a) from (b) i.e.,
if Stateful Functions can allow accessing/sending graph state using their
primitives for messaging.

Therefore, my question are the following:

i)   is it possible to have Flink jobs that invoke the Stateful Functions
just for state accessing/retrieving/migration??
ii)  if yes, how efficient would it be?
iii) if not, can you sketch *proper* way(s) of  for state
accessing/retrieving/migration in plain Flink??

Thanks in advance.

Best,
Makis



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

Posted by Robert Metzger <rm...@apache.org>.
Hey Max,

1) Stateful functions has been released now:
https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-core
See also:
https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
Getting Started:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/getting-started/java_walkthrough.html

Please give us honest feedback on the "onboarding experience" with stateful
functions. We are very eager to make the experience as smooth as possible
:)

2) What do you consider large state? The Flink runtime itself can handle
large events / messages (1GB should work). I'm not sure about statefun, but
I will try to get an answer for you :)

Best,
Robert


On Tue, Apr 7, 2020 at 9:31 AM m@xi <ma...@gmail.com> wrote:

> Hello Robert
>
> Thanks to your reply I discovered the Stateful Functions which I believe is
> a quite powerful tool. I have some questions:
>
> 1) As you said, "the community is currently in the process of releasing the
> first Apache release of StateFun and it should hopefully be out by the end
> of this week". Does this mean that it will become available in Maven
> Repository?
>
> Because I can't find it while searching in
> https://mvnrepository.com/artifact/org.apache.flink?sort=newest
> or
> use the API in my intellij project when I import the dependencies in my POM
> file.
>
> I though of dowloading the code from
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/,
> compiling
> it with *mvn clean package* and then import the produced jar file to my
> intellij project as an External Library. Is this what you might recommend
> for now?
>
> 2) I came across this tutorial by Stefan on stateful functions
> https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that
> arbitrary
> communication between nodes/functions/actors is essentially made possible
> by
> introducing feedback loops to the DAG Flink topology (well now it has
> circles I guess :P) to simulate the arbitrary messasing defined in the
> StateFun topology.
>
> So the message passing is done by "feedback and tuple rerouting" and not
> with MPI. Do you think (or have tested) if one may *efficiently*
> send/receive (potentially large) state, like graph state which is the use
> case of this post?  Or it is more suitable for sending control messages
> between actors/functions?
>
> Thanks a lot in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

Posted by "m@xi" <ma...@gmail.com>.
Hello Robert

Thanks to your reply I discovered the Stateful Functions which I believe is
a quite powerful tool. I have some questions:

1) As you said, "the community is currently in the process of releasing the
first Apache release of StateFun and it should hopefully be out by the end
of this week". Does this mean that it will become available in Maven
Repository? 

Because I can't find it while searching in
https://mvnrepository.com/artifact/org.apache.flink?sort=newest 
or 
use the API in my intellij project when I import the dependencies in my POM
file.

I though of dowloading the code from
https://ci.apache.org/projects/flink/flink-statefun-docs-master/, compiling
it with *mvn clean package* and then import the produced jar file to my
intellij project as an External Library. Is this what you might recommend
for now?

2) I came across this tutorial by Stefan on stateful functions
https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that arbitrary
communication between nodes/functions/actors is essentially made possible by
introducing feedback loops to the DAG Flink topology (well now it has
circles I guess :P) to simulate the arbitrary messasing defined in the
StateFun topology.

So the message passing is done by "feedback and tuple rerouting" and not
with MPI. Do you think (or have tested) if one may *efficiently*
send/receive (potentially large) state, like graph state which is the use
case of this post?  Or it is more suitable for sending control messages
between actors/functions?

Thanks a lot in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Fwd: Complex graph-based sessionization (potential use for stateful functions)

Posted by Robert Metzger <rm...@apache.org>.
Forwarding Seth's answer to the list

---------- Forwarded message ---------
From: Seth Wiesman <se...@ververica.com>
Date: Tue, Mar 31, 2020 at 4:47 PM
Subject: Re: Complex graph-based sessionization (potential use for stateful
functions)
To: Krzysztof Zarzycki <k....@gmail.com>
Cc: user <us...@flink.apache.org>, <rm...@apache.org>


Hi Krzysztof,

This is a great use case for Stateful Functions. I have actually been
considering adding a graph algorithm example to the statefun repo for some
time now.

StateFun does use iteration under the hood and provides exactly-once
guarantees. In-flight records will never be lost in the case of failure.
From a user code perspective, the api offers arbitrary message passing
between different functions (stateful virtual actors).

For a rough sketch of what this would look like; you could create a
function called `Vertex` that represents a single vertice on the graph. Its
state would the edges, all vertices reachable from that point, We now have
a distributed, fault-tolerant, adjacency list. You can implement whatever
graph algorithm you like on top of this structure. Walking the graph would
just be starting from a point, and messaging the vertices stored in state.

Just in case you are not aware, the community is currently in the process
of releasing the first Apache release of StateFun and it should hopefully
be out by the end of this week. Just to say the API is stable and you can
start developing on top of it.


On Mon, Mar 30, 2020 at 6:00 PM Krzysztof Zarzycki <k....@gmail.com>
wrote:

> Hi!  Interesting problem to solve ahead :)
> I need to implement a streaming sessionization algorithm (split stream of
> events into groups of correlated events). It's pretty non-standard as we
> DON'T have a key like user id which separates the stream into substreams
> which we just need to chunk based on time.
> Instead and simplifying a lot, our events bear tuples, that I compare to
> graph edges, e.g.:
> event 1: A -> B
> event 2: B -> C
> event 3: D -> E
> event 4: D -> F
> event 5: G -> F
> I need to group them into subgroups reachable by following these edges
> from some specific nodes. E.g. here:
> { A->B, B->C}
> { D->E, D->F}
> { G->F }
> (note: I need to group the events, which are represented by edges here,
> not the nodes).
> As far as I understand, to solve this problem I need to leverage feedback
> loops/iterations feature in Flink (Generally I believe I need to apply a
> Bulk Synchronous Processing approach).
>
> Does anyone have seen this kind of sessionization implemented in the wild?
> Would you suggest implementing such an algorithm using *stateful
> functions*? (AFAIK, they use feedback loops underneath). Can you suggest
> how would these be used here?
> I know there are some problems with checkpointing when using iterations,
> does it mean the implementation may experience data loss on stops?
>
> Side comment: I'm not sure which graph algorithm derivative needs to be
> applied here, but the candidate is transitive closure.
>
> Thanks for joining the discussion!
> Krzysztof
>
>

-- 

Seth Wiesman | Solutions Architect

+1 314 387 1463

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time