You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2020/10/17 22:16:12 UTC

Remote Stateful Function Scalability

After reading the Stateful Functions documentation, I am left wondering how
remote stateful functions scale.

The documentation mentions that the use of remote functions allows the
state and compute tiers to scale independently. But the documentation seems
to imply that only a single instance of a function type can execute at a
time per worker ("*When an application starts, each parallel worker of the
framework will create one physical object per function type. This object
will be used to execute all logical instances of that type that are run by
that particular worker.*") That would seem to tie and limit the parallelism
of the compute layer to that of the storage layer even when using remote
functions.

Can a worker execute multiple concurrent remote stateful functions of
different types?

Can a worker execute multiple concurrent remote stateful functions of the
same type with different keys?

If a worker can execute multiple concurrent remote stateful functions of
the same type with different keys, does it ensure their output is ordered
like its inputs?

Re: Remote Stateful Function Scalability

Posted by Seth Wiesman <sj...@gmail.com>.
As a note, I wrote that concepts section before remote functions were
implemented. I've made a note to myself to go through and update it.

Seth

On Sat, Oct 17, 2020 at 9:29 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Elias,
>
> On Sun, Oct 18, 2020 at 6:16 AM Elias Levy <fe...@gmail.com>
> wrote:
>
>> After reading the Stateful Functions documentation, I am left wondering
>> how remote stateful functions scale.
>>
>> The documentation mentions that the use of remote functions allows the
>> state and compute tiers to scale independently. But the documentation seems
>> to imply that only a single instance of a function type can execute at a
>> time per worker ("*When an application starts, each parallel worker of
>> the framework will create one physical object per function type. This
>> object will be used to execute all logical instances of that type that are
>> run by that particular worker.*") That would seem to tie and limit the
>> parallelism of the compute layer to that of the storage layer even when
>> using remote functions.
>>
>
> Your observation is correct only for embedded functions, not for remote
> functions.
> For remote functions, in the StateFun workers each physical object per
> function type acts as an asynchronous invocation dispatcher to the type's
> remote function service.
>
> Just to quickly brief what the dispatcher does:
> The dispatcher *only ensures sequential invocation per logical address*
> (function type + logical instance ID / key).
> Invocations for different logical addresses (different types / different
> keys) can happen concurrently.
>
> If an invocation request for a logical address is in-flight, and other
> messages targeted for that address arrive, they are buffered in a backlog
> (state) until the pending request completes.
> Upon completion, the backlog is flushed and all buffered messages are sent
> to the remote function as a single batch invocation request.
> Backpressure is applied once the backlog size reaches a threshold.
>
> All in all, in vanilla Flink-land terms, this works similarly to Flink's
> AsyncIO without the stream order preserved.
>
> So, to conclude by answering your specific questions:
>
>
>>
>> Can a worker execute multiple concurrent remote stateful functions of
>> different types?
>>
>
> Yes.
>
>
>>
>> Can a worker execute multiple concurrent remote stateful functions of the
>> same type with different keys?
>>
>
> Yes.
>
>
>>
>> If a worker can execute multiple concurrent remote stateful functions of
>> the same type with different keys, does it ensure their output is ordered
>> like its inputs?
>>
>
> No, currently StateFun handles outgoing messages (i.e. messages going to
> other functions / egresses) only based on the order that the concurrent
> invocation requests complete.
> However, I believe that it should be possible to support an ordered mode
> here at the cost of extra latency (early completes need to be buffered,
> checkpoint overhead etc.).
>
> Hope this helps clarify some things!
>
> Cheers,
> Gordon
>

Re: Remote Stateful Function Scalability

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Elias,

On Sun, Oct 18, 2020 at 6:16 AM Elias Levy <fe...@gmail.com>
wrote:

> After reading the Stateful Functions documentation, I am left wondering
> how remote stateful functions scale.
>
> The documentation mentions that the use of remote functions allows the
> state and compute tiers to scale independently. But the documentation seems
> to imply that only a single instance of a function type can execute at a
> time per worker ("*When an application starts, each parallel worker of
> the framework will create one physical object per function type. This
> object will be used to execute all logical instances of that type that are
> run by that particular worker.*") That would seem to tie and limit the
> parallelism of the compute layer to that of the storage layer even when
> using remote functions.
>

Your observation is correct only for embedded functions, not for remote
functions.
For remote functions, in the StateFun workers each physical object per
function type acts as an asynchronous invocation dispatcher to the type's
remote function service.

Just to quickly brief what the dispatcher does:
The dispatcher *only ensures sequential invocation per logical address*
(function type + logical instance ID / key).
Invocations for different logical addresses (different types / different
keys) can happen concurrently.

If an invocation request for a logical address is in-flight, and other
messages targeted for that address arrive, they are buffered in a backlog
(state) until the pending request completes.
Upon completion, the backlog is flushed and all buffered messages are sent
to the remote function as a single batch invocation request.
Backpressure is applied once the backlog size reaches a threshold.

All in all, in vanilla Flink-land terms, this works similarly to Flink's
AsyncIO without the stream order preserved.

So, to conclude by answering your specific questions:


>
> Can a worker execute multiple concurrent remote stateful functions of
> different types?
>

Yes.


>
> Can a worker execute multiple concurrent remote stateful functions of the
> same type with different keys?
>

Yes.


>
> If a worker can execute multiple concurrent remote stateful functions of
> the same type with different keys, does it ensure their output is ordered
> like its inputs?
>

No, currently StateFun handles outgoing messages (i.e. messages going to
other functions / egresses) only based on the order that the concurrent
invocation requests complete.
However, I believe that it should be possible to support an ordered mode
here at the cost of extra latency (early completes need to be buffered,
checkpoint overhead etc.).

Hope this helps clarify some things!

Cheers,
Gordon