You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by KristoffSC <kr...@gmail.com> on 2020/08/10 14:24:12 UTC

Using managed keyed state with AsynIo

Hi guys,
I'm using Flink 1.9.2

I have a question about uses case where I would like to use FLink's managed
keyed state with Async IO [1]


Lets take as a base line below example taken from [1] and lets assume that
we are executing this on a keyed stream.

final Future<String> result = client.query(key);

CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key,
dbResult)));
        });


Imagine that instead passing key to client.query(..) we will pass some value
taken from Flinks Managed, keyed state. Later the supplier's get method will
return a value that should be stored in that state. In other words, we use
previous results as inputs for next computations. 

Is this achievable with Flinks AsyncIo? I can have many pending requests on
client.query which can finished in a random order. The
AsyncDataStream.orderedWait will not help he here since this affects only
the way how Flink "releases" the messages from it's internal queue for Async
operators. 


What is more, this scenario can result with multiple concurrent writes/reads
to/from Flink's managed state for same key values. Is this thread safe?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using managed keyed state with AsynIo

Posted by Arvid Heise <ar...@ververica.com>.
Hi Kristoff,

the answer to your big questions is unfortunately no, two times. I see two
options in general:

1) process function (as you proposed). On processElement, you'd read the
state and invoke your async operation. You enqueue your result in some
result queue where you emit it in the next call of processElement. To deal
with rare keys, you'd probably also want to use a timer to flush the
outputs instead. In the timer/next processElement, you can also access the
key state. However, you also need to ensure that these pending results are
snapshotted, such that they are not lost on crash. I'd expect that you can
mix ProcessFunction and CheckpointedFunction, but haven't done it yet
myself.

2) implement your own operator, where you can start by copying or
subclassing the existing AsyncWaitOperator [1]. One thing that you need to
look out for is to access the state and output collector only in mailbox
thread (=main task thread). You can use mailboxExecutor to execute a piece
of code in the mailbox.

Even if you go by 1), have a look at the AsyncWaitOperator as it should
serve as a good template.

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

On Fri, Aug 14, 2020 at 12:14 PM KristoffSC <kr...@gmail.com>
wrote:

> Thanks Arvid,
> I like your propositions in my case I wanted to use the state value to
> decide if I should do the Async Call to external system. The result of this
> call would be a state input. So having this:
>
> Process1(calcualteValue or take it from state) -> AsyncCall to External
> system to persist/Validate the value -> Process2(feedback loop Via
> meessagibg queue to process1).
>
> Apart from that Process1 would have to consume two streams, which is ok, I
> woudl actually have a delay. I wanted to avouid uneceserry calls to
> External
> system by having the cashed/validated value in state.
>
> And this would be done without the delay if I could use State in Async
> Operators.
>
>
> I'm finking bout manufacturing my own Semi Async Operator. My Idea is that
> I
> would have normal KeyedProcessFunction that will wrap list of
> SingleThreadExecutors.
>
> In processElement method I will use Key to calculate the index of that
> Array
> to make sure that message for same Key will go to the same ThreadExecutor.
> I
> do want to keep the message order.
>
> I will submit a task like
> executor.submit(() -> {
>     MyResult result = rservice.process(message, mapState.get(key));
>     mapState.put(key, result);
>     out.collect(newMessage);
> }
>
>
>
> Big questions:
> 1. In my solution  out.collect(newMessage); will be called from few threads
> (each will have different message). Is it ThreadSafe?
> 2. Is using the MapState in multiThreadEnv like I would have here is thread
> safe?
> Alternativelly I can have associate list of mapStates, one for each
> SingleThreadExecutors, so It will be used only by one thread.
>
> With this setup I will not block my Pipeline and I will be able to use
> state. I agree that Size of SingleThreadExecutors list will be a limiting
> factor.
>
>
> Is this setup possible with Flink?
>
>
> Btw I will use RocksDbStateBackend
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Using managed keyed state with AsynIo

Posted by KristoffSC <kr...@gmail.com>.
Thanks Arvid,
I like your propositions in my case I wanted to use the state value to
decide if I should do the Async Call to external system. The result of this
call would be a state input. So having this:

Process1(calcualteValue or take it from state) -> AsyncCall to External
system to persist/Validate the value -> Process2(feedback loop Via
meessagibg queue to process1).

Apart from that Process1 would have to consume two streams, which is ok, I
woudl actually have a delay. I wanted to avouid uneceserry calls to External
system by having the cashed/validated value in state. 

And this would be done without the delay if I could use State in Async
Operators. 


I'm finking bout manufacturing my own Semi Async Operator. My Idea is that I
would have normal KeyedProcessFunction that will wrap list of
SingleThreadExecutors.

In processElement method I will use Key to calculate the index of that Array
to make sure that message for same Key will go to the same ThreadExecutor. I
do want to keep the message order.

I will submit a task like
executor.submit(() -> {
    MyResult result = rservice.process(message, mapState.get(key));
    mapState.put(key, result);
    out.collect(newMessage);
}



Big questions:
1. In my solution  out.collect(newMessage); will be called from few threads
(each will have different message). Is it ThreadSafe?
2. Is using the MapState in multiThreadEnv like I would have here is thread
safe?
Alternativelly I can have associate list of mapStates, one for each
SingleThreadExecutors, so It will be used only by one thread.

With this setup I will not block my Pipeline and I will be able to use
state. I agree that Size of SingleThreadExecutors list will be a limiting
factor. 


Is this setup possible with Flink?


Btw I will use RocksDbStateBackend






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using managed keyed state with AsynIo

Posted by Arvid Heise <ar...@ververica.com>.
Hi KristoffSC,

you are right that state is not shared across operators - I forgot about
that. So the approach would only be valid as is if the state can be
properly separated into two independent subtasks. For example, you need the
state to find the database key and you store the full entry in Flink state
afterwards. Then you could fetch the key in the map before async IO and
keep the full record in the map after async IO.

Another approach is to perform some kind of feedback from async IO to the
first map. There is usually a tradeoff between performance (use a Kafka
topic for feedback) and complexity (write some TCP socket magic). I'd
rather recommend to have a look at statefun though [1], which implements
this feedback in an efficient way and provides a good abstraction for
everything that is state-related. Unfortunately, mixing Flink jobs and
statefun applications is still not easily possible - I'm assuming it would
happen in the next major release. But maybe, you can express everything in
statefun, at which point, it's the best choice.

For your question: it shouldn't make any difference, as the function gets
serialized in the main() and deserialized at each JM/TM resulting in many
copies. The only difference is that in your main(), you have one fewer
copy. Since Flink state is only touched in TM, the function instances are
different anyways.

[1] https://github.com/apache/flink-statefun

On Thu, Aug 13, 2020 at 2:53 PM KristoffSC <kr...@gmail.com>
wrote:

> Hi Arvid,
> thank you for the respond.
> Yeah I tried to run my job shortly after posting my message and I got
> "State
> is not supported in rich async function" ;)
>
> I came up with a solution that would solve my initial problem -
> concurrent/Async problem of processing messages with the same key but
> unfortunately stet is not sported here.
>
> Thank you for the proposition
> source -> keyby -> map (retrieve state) -> async IO (use state) -> map
> (update state)
>
> However I'm a little bit surprised. I thought that state on a keyed cannot
> be shared between operators, and here you are suggesting doing that. Is it
> possible then?
>
>
> Using this occasion I have additional question, Is there any difference
> from
> Flink perspective between this two approaches:
>
> MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
> stateless object, but it uses Flink keyed state.
>
> Setup 1:
>
> source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink
>
> Setup 2:
> source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
> process(new MyProcessFunction()) -> sink
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Using managed keyed state with AsynIo

Posted by KristoffSC <kr...@gmail.com>.
Hi Arvid,
thank you for the respond. 
Yeah I tried to run my job shortly after posting my message and I got "State
is not supported in rich async function" ;)

I came up with a solution that would solve my initial problem -
concurrent/Async problem of processing messages with the same key but
unfortunately stet is not sported here.

Thank you for the proposition
source -> keyby -> map (retrieve state) -> async IO (use state) -> map
(update state)

However I'm a little bit surprised. I thought that state on a keyed cannot
be shared between operators, and here you are suggesting doing that. Is it
possible then?


Using this occasion I have additional question, Is there any difference from
Flink perspective between this two approaches:

MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
stateless object, but it uses Flink keyed state.

Setup 1:

source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink

Setup 2:
source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
process(new MyProcessFunction()) -> sink 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using managed keyed state with AsynIo

Posted by Arvid Heise <ar...@ververica.com>.
Hi KristoffSC,

Afaik asyncIO does not support state operations at all because of your
mentioned issues (RichAsyncFunction fails if you access state).

I'd probably solve it by having a map or process function before and after
the asyncIO for the state operations. If you enable object reuse,
performance should be pretty much the same as if async I/O would support
it, but the threading model becomes much easier.

So, the pipeline is source -> keyby -> map (retrieve state) -> async IO
(use state) -> map (update state). You might need to return Tuple<Key,
State> from map and asyncIO to have the full context information on the
subsequent operators.

On Mon, Aug 10, 2020 at 4:24 PM KristoffSC <kr...@gmail.com>
wrote:

> Hi guys,
> I'm using Flink 1.9.2
>
> I have a question about uses case where I would like to use FLink's managed
> keyed state with Async IO [1]
>
>
> Lets take as a base line below example taken from [1] and lets assume that
> we are executing this on a keyed stream.
>
> final Future<String> result = client.query(key);
>
> CompletableFuture.supplyAsync(new Supplier<String>() {
>
>             @Override
>             public String get() {
>                 try {
>                     return result.get();
>                 } catch (InterruptedException | ExecutionException e) {
>                     // Normally handled explicitly.
>                     return null;
>                 }
>             }
>         }).thenAccept( (String dbResult) -> {
>             resultFuture.complete(Collections.singleton(new Tuple2<>(key,
> dbResult)));
>         });
>
>
> Imagine that instead passing key to client.query(..) we will pass some
> value
> taken from Flinks Managed, keyed state. Later the supplier's get method
> will
> return a value that should be stored in that state. In other words, we use
> previous results as inputs for next computations.
>
> Is this achievable with Flinks AsyncIo? I can have many pending requests on
> client.query which can finished in a random order. The
> AsyncDataStream.orderedWait will not help he here since this affects only
> the way how Flink "releases" the messages from it's internal queue for
> Async
> operators.
>
>
> What is more, this scenario can result with multiple concurrent
> writes/reads
> to/from Flink's managed state for same key values. Is this thread safe?
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng