You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Filip Karnicki <fi...@gmail.com> on 2022/11/02 09:13:56 UTC

Re: question about Async IO

Hi Galen

I was thinking about the same thing recently and reached a point where I
see that async io does not have access to the keyed state because:

"* State related apis in
[[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
 * yet because the key may get changed while accessing states in the
working thread."

I don't think that the key can change at any random time here, because of

"A common confusion that we want to explicitly point out here is that the
AsyncFunction is not called in a multi-threaded fashion. There exists only
one instance of the AsyncFunction and it is called sequentially for each
record in the respective partition of the stream"
From:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

So if the RichAsyncFunctionRuntimeContext had access to a
KeyedStateBackend and since it's basically a facade on top of
RuntimeContext. we could (maybe) change the method signature for something
like getState to include the key, and run
keyedStateBackend.setCurrentKey(key) before continuing with anything else.


Anyone - please stop me if I'm talking nonsense


On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> wrote:

> Hi Galen,
> i will tell from my experience as a Flink user and developer of Flink jobs.
>
>
>
> *"if the input to an AsyncFunction is a keyed stream, can I assume that
> all input elements with the same key will be handled by the same instance
> of the async operator"*
> From what I know (and someone can correct me if I'm wrong) this is
> possible. However you have to make sure that there is no Re-balance or
> re-shuffle between those operators. For example operators after first
> .keyBy(..) call must have same parallelism level.
>
> Regarding:
> " I have a situation where I would like to enforce that async operations
> associated with a particular key happen sequentially,"
>
> This is also possible as far as I know. In fact I was implementing
> streaming pipeline with similar requirements like
> *"maintaining order of events withing keyBy group across multiple
> operators including Async operators". *
> We achieved that with same thing -> making sure that all operators in
> entire pipeline except Source and Sink had exact same parallelism level.
> Additional thing to remember here is that if you call .keyBy(...) again
> but with different key extractor, then original order might not be
> preserved since keyBy will execute re-shuffle/re-balance.
>
> We were also using reinterpretAsKeyedStream feature [1] after async
> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
> .keyBy always has negative impact on performance.
> With reinterpretAsKeyedStream we were able to use keyed operators with
> access to keyed state after Async operators.
>
> Hope that helped.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
>
> Regards,
> Krzysztof Chmielewski
>
>
>
>
>
>
>
> pt., 14 paź 2022 o 19:11 Galen Warren <ga...@cvillewarrens.com>
> napisał(a):
>
>> I have a question about Flink's Async IO support: Async I/O | Apache
>> Flink
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
>> .
>>
>> I understand that access to state is not supported in an AsyncFunction.
>> However, if the input to an AsyncFunction is a keyed stream, can I assume
>> that all input elements with the same key will be handled by the same
>> instance of the async operator, as would normally be the case with keyed
>> streams/operators?
>>
>> I'm asking because I have a situation where I would like to enforce that
>> async operations associated with a particular key happen sequentially, i.e.
>> if two elements come through with the same key, I need  the async operation
>> for the second to happen after the async operation for the first one
>> completes. I think I can achieve this using a local map of "in flight"
>> async operations in the operator itself, but only if I can rely on all
>> input elements with the same key being processed by the same async operator.
>>
>> If anyone can confirm how this works, I'd appreciate it. Thanks.
>>
>

Re: question about Async IO

Posted by David Anderson <da...@apache.org>.
Yes, that will work as you expect. So long as you don't put another shuffle
or rebalance in between, the keyed partitioning that's already in place
will carry through the async i/o operator, and beyond. In most cases you
can even use reinterpretAsKeyedStream on the output (so long as you haven't
done something to emit records that break the contract it expects).

David

On Wed, Nov 2, 2022 at 4:19 PM Galen Warren <ga...@cvillewarrens.com> wrote:

> Thanks, that makes sense and matches my understanding of how it works.
>
> In my case, I don't actually need access to keyed *state*; I just want to
> make sure that all elements with the same key are routed to the same
> instance of the async function. (Without going into too much detail, the
> reason for this is that I want to invoke async operations for the same key
> in sequence, i.e. not have two in-flight async operations for the same key
> at the same time. I can accomplish this with a local map of in-flight async
> operations, in the function, so long as all inputs with the same keys get
> routed to the same instance of the function.)
>
> So far as I can tell, if I create a keyed stream and then use it as an
> input to an async function, the keys will be distributed across the async
> function instances the way I want, even if keyed state is inaccessible.
> Anyway, that's what I'm looking to confirm.
>
> On Wed, Nov 2, 2022 at 5:14 AM Filip Karnicki <fi...@gmail.com>
> wrote:
>
>> Hi Galen
>>
>> I was thinking about the same thing recently and reached a point where I
>> see that async io does not have access to the keyed state because:
>>
>> "* State related apis in
>> [[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
>>  * yet because the key may get changed while accessing states in the
>> working thread."
>>
>> I don't think that the key can change at any random time here, because of
>>
>> "A common confusion that we want to explicitly point out here is that the
>> AsyncFunction is not called in a multi-threaded fashion. There exists only
>> one instance of the AsyncFunction and it is called sequentially for each
>> record in the respective partition of the stream"
>> From:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
>>
>> So if the RichAsyncFunctionRuntimeContext had access to a
>> KeyedStateBackend and since it's basically a facade on top of
>> RuntimeContext. we could (maybe) change the method signature for something
>> like getState to include the key, and run
>> keyedStateBackend.setCurrentKey(key) before continuing with anything else.
>>
>>
>> Anyone - please stop me if I'm talking nonsense
>>
>>
>> On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
>> krzysiek.chmielewski@gmail.com> wrote:
>>
>>> Hi Galen,
>>> i will tell from my experience as a Flink user and developer of Flink
>>> jobs.
>>>
>>>
>>>
>>> *"if the input to an AsyncFunction is a keyed stream, can I assume that
>>> all input elements with the same key will be handled by the same instance
>>> of the async operator"*
>>> From what I know (and someone can correct me if I'm wrong) this is
>>> possible. However you have to make sure that there is no Re-balance or
>>> re-shuffle between those operators. For example operators after first
>>> .keyBy(..) call must have same parallelism level.
>>>
>>> Regarding:
>>> " I have a situation where I would like to enforce that async operations
>>> associated with a particular key happen sequentially,"
>>>
>>> This is also possible as far as I know. In fact I was implementing
>>> streaming pipeline with similar requirements like
>>> *"maintaining order of events withing keyBy group across multiple
>>> operators including Async operators". *
>>> We achieved that with same thing -> making sure that all operators in
>>> entire pipeline except Source and Sink had exact same parallelism level.
>>> Additional thing to remember here is that if you call .keyBy(...) again
>>> but with different key extractor, then original order might not be
>>> preserved since keyBy will execute re-shuffle/re-balance.
>>>
>>> We were also using reinterpretAsKeyedStream feature [1] after async
>>> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
>>> .keyBy always has negative impact on performance.
>>> With reinterpretAsKeyedStream we were able to use keyed operators with
>>> access to keyed state after Async operators.
>>>
>>> Hope that helped.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
>>>
>>> Regards,
>>> Krzysztof Chmielewski
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> pt., 14 paź 2022 o 19:11 Galen Warren <ga...@cvillewarrens.com>
>>> napisał(a):
>>>
>>>> I have a question about Flink's Async IO support: Async I/O | Apache
>>>> Flink
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
>>>> .
>>>>
>>>> I understand that access to state is not supported in an AsyncFunction.
>>>> However, if the input to an AsyncFunction is a keyed stream, can I assume
>>>> that all input elements with the same key will be handled by the same
>>>> instance of the async operator, as would normally be the case with keyed
>>>> streams/operators?
>>>>
>>>> I'm asking because I have a situation where I would like to enforce
>>>> that async operations associated with a particular key happen sequentially,
>>>> i.e. if two elements come through with the same key, I need  the async
>>>> operation for the second to happen after the async operation for the first
>>>> one completes. I think I can achieve this using a local map of "in flight"
>>>> async operations in the operator itself, but only if I can rely on all
>>>> input elements with the same key being processed by the same async operator.
>>>>
>>>> If anyone can confirm how this works, I'd appreciate it. Thanks.
>>>>
>>>

Re: question about Async IO

Posted by Galen Warren <ga...@cvillewarrens.com>.
Thanks, that makes sense and matches my understanding of how it works.

In my case, I don't actually need access to keyed *state*; I just want to
make sure that all elements with the same key are routed to the same
instance of the async function. (Without going into too much detail, the
reason for this is that I want to invoke async operations for the same key
in sequence, i.e. not have two in-flight async operations for the same key
at the same time. I can accomplish this with a local map of in-flight async
operations, in the function, so long as all inputs with the same keys get
routed to the same instance of the function.)

So far as I can tell, if I create a keyed stream and then use it as an
input to an async function, the keys will be distributed across the async
function instances the way I want, even if keyed state is inaccessible.
Anyway, that's what I'm looking to confirm.

On Wed, Nov 2, 2022 at 5:14 AM Filip Karnicki <fi...@gmail.com>
wrote:

> Hi Galen
>
> I was thinking about the same thing recently and reached a point where I
> see that async io does not have access to the keyed state because:
>
> "* State related apis in
> [[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
>  * yet because the key may get changed while accessing states in the
> working thread."
>
> I don't think that the key can change at any random time here, because of
>
> "A common confusion that we want to explicitly point out here is that the
> AsyncFunction is not called in a multi-threaded fashion. There exists only
> one instance of the AsyncFunction and it is called sequentially for each
> record in the respective partition of the stream"
> From:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
>
> So if the RichAsyncFunctionRuntimeContext had access to a
> KeyedStateBackend and since it's basically a facade on top of
> RuntimeContext. we could (maybe) change the method signature for something
> like getState to include the key, and run
> keyedStateBackend.setCurrentKey(key) before continuing with anything else.
>
>
> Anyone - please stop me if I'm talking nonsense
>
>
> On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com> wrote:
>
>> Hi Galen,
>> i will tell from my experience as a Flink user and developer of Flink
>> jobs.
>>
>>
>>
>> *"if the input to an AsyncFunction is a keyed stream, can I assume that
>> all input elements with the same key will be handled by the same instance
>> of the async operator"*
>> From what I know (and someone can correct me if I'm wrong) this is
>> possible. However you have to make sure that there is no Re-balance or
>> re-shuffle between those operators. For example operators after first
>> .keyBy(..) call must have same parallelism level.
>>
>> Regarding:
>> " I have a situation where I would like to enforce that async operations
>> associated with a particular key happen sequentially,"
>>
>> This is also possible as far as I know. In fact I was implementing
>> streaming pipeline with similar requirements like
>> *"maintaining order of events withing keyBy group across multiple
>> operators including Async operators". *
>> We achieved that with same thing -> making sure that all operators in
>> entire pipeline except Source and Sink had exact same parallelism level.
>> Additional thing to remember here is that if you call .keyBy(...) again
>> but with different key extractor, then original order might not be
>> preserved since keyBy will execute re-shuffle/re-balance.
>>
>> We were also using reinterpretAsKeyedStream feature [1] after async
>> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
>> .keyBy always has negative impact on performance.
>> With reinterpretAsKeyedStream we were able to use keyed operators with
>> access to keyed state after Async operators.
>>
>> Hope that helped.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>>
>>
>>
>>
>>
>>
>> pt., 14 paź 2022 o 19:11 Galen Warren <ga...@cvillewarrens.com>
>> napisał(a):
>>
>>> I have a question about Flink's Async IO support: Async I/O | Apache
>>> Flink
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
>>> .
>>>
>>> I understand that access to state is not supported in an AsyncFunction.
>>> However, if the input to an AsyncFunction is a keyed stream, can I assume
>>> that all input elements with the same key will be handled by the same
>>> instance of the async operator, as would normally be the case with keyed
>>> streams/operators?
>>>
>>> I'm asking because I have a situation where I would like to enforce that
>>> async operations associated with a particular key happen sequentially, i.e.
>>> if two elements come through with the same key, I need  the async operation
>>> for the second to happen after the async operation for the first one
>>> completes. I think I can achieve this using a local map of "in flight"
>>> async operations in the operator itself, but only if I can rely on all
>>> input elements with the same key being processed by the same async operator.
>>>
>>> If anyone can confirm how this works, I'd appreciate it. Thanks.
>>>
>>