You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Surana <vi...@moengage.com> on 2022/04/28 16:41:38 UTC

Broadcast State + Stateful Operator + Async IO

Hello,
My application has a stateful operator which leverages RocksDB to store a
large amount of state. It, along with other operators receive configuration
as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
upon another input stream that triggers some communication with external
services whose results are then combined to yield the state that gets
stored in RocksDB.

In order to make the application more efficient, I am going to switch to
asynchronous IO but as the result is ultimately going to be a (Scala)
Future, I will have to block once to get the result. I was hoping to
leverage the Async IO operator but that apparently doesn't support RocksDB
based state storage. Am I correct in saying
that KeyedBroadcastProcessFunction is the only option I have? If so, then I
want to understand how registering a future's callbacks (via onComplete)
works with a synchronous operator such as KeyedBroadcastProcessFunction.
Will the thread executing the function simply relinquish control to some
other subtask while the results of the external services are being awaited?
Will the callback eventually be triggered automatically or will I have to
explicitly block on the result future like so: Await.result(f, timeout).

-- 
Regards,
Vishal

Re: Broadcast State + Stateful Operator + Async IO

Posted by Vishal Surana <vi...@moengage.com>.
Thanks a lot for your quick response! Your suggestion however would never
work for our use case. Ours is a streaming system that must process 100
thousand messages per second and produce immediate results and it's simply
impossible to rerun the job.

Our job is a streaming job broken down into various operators with very
strict latency requirements (less than 10 seconds at all times). There
could be multiple messages for a given entity in quick succession and
ordered processing is another strict requirement. Question is how can we
best leverage flink's features of stateful stream processing as well as
async IO.

Re: Broadcast State + Stateful Operator + Async IO

Posted by Guowei Ma <gu...@gmail.com>.
Hi Vishal

If your scenario is to update the data in full every time. One idea is to
rerun the job every time. For example, you have an
`EnrichWithDatabaseAndWebSerivce` job, which is responsible for reading all
data from a data source every time, and then joins the data with DB and Web
services. Every time you need to re-enrich you have to start the job again.

Also, can you briefly describe what the frequency is?

Best,
Guowei


On Fri, Apr 29, 2022 at 2:20 PM Vishal Surana <vi...@moengage.com> wrote:

> Yes. You have explained my requirements exactly as they are. My operator
> will talk to multiple databases and a couple of web services to enrich
> incoming input streams. I cannot think of a way to use the async IO
> operator. So I thought maybe convert these 7-10 calls into async calls and
> chain the Futures together. I believe I have to block once in the end of
> the KeyedBroadcastProcessFunction but if there's a way to avoid that also
> while also ensuring ordered processing of events, then do let me know.
>
> On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma <gu...@gmail.com> wrote:
>
>> Hi Vishal
>>
>> I want to understand your needs first. Your requirements are: After a
>> stateful operator receives a notification, it needs to traverse all the
>> data stored in the operator state, communicate with an external system
>> during the traversal process (maybe similar to join?). In order to improve
>> the efficiency of  this behavior, you want to take an asynchronous
>> approach. That is, if you modify the state of different keys, do not block
>> each other due to external communication.
>> If I understand correctly, according to the existing function of
>> KeyedBroadcastProcessFunction, it is really impossible.
>> As for whether there are other solutions, it may depend on specific
>> scenarios, such as what kind of external system. So could you describe in
>> detail what scenario has this requirement, and what are the external
>> systems it depends on?
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana <vi...@moengage.com>
>> wrote:
>>
>>> Hello,
>>> My application has a stateful operator which leverages RocksDB to store
>>> a large amount of state. It, along with other operators receive
>>> configuration as a broadcast stream (KeyedBroadcastProcessFunction). The
>>> operator depends upon another input stream that triggers some communication
>>> with external services whose results are then combined to yield the state
>>> that gets stored in RocksDB.
>>>
>>> In order to make the application more efficient, I am going to switch to
>>> asynchronous IO but as the result is ultimately going to be a (Scala)
>>> Future, I will have to block once to get the result. I was hoping to
>>> leverage the Async IO operator but that apparently doesn't support RocksDB
>>> based state storage. Am I correct in saying
>>> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
>>> want to understand how registering a future's callbacks (via onComplete)
>>> works with a synchronous operator such as KeyedBroadcastProcessFunction.
>>> Will the thread executing the function simply relinquish control to some
>>> other subtask while the results of the external services are being awaited?
>>> Will the callback eventually be triggered automatically or will I have to
>>> explicitly block on the result future like so: Await.result(f, timeout).
>>>
>>> --
>>> Regards,
>>> Vishal
>>>
>>
>
> --
> Regards,
> Vishal
>

Re: Broadcast State + Stateful Operator + Async IO

Posted by Vishal Surana <vi...@moengage.com>.
Yes. You have explained my requirements exactly as they are. My operator
will talk to multiple databases and a couple of web services to enrich
incoming input streams. I cannot think of a way to use the async IO
operator. So I thought maybe convert these 7-10 calls into async calls and
chain the Futures together. I believe I have to block once in the end of
the KeyedBroadcastProcessFunction but if there's a way to avoid that also
while also ensuring ordered processing of events, then do let me know.

On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma <gu...@gmail.com> wrote:

> Hi Vishal
>
> I want to understand your needs first. Your requirements are: After a
> stateful operator receives a notification, it needs to traverse all the
> data stored in the operator state, communicate with an external system
> during the traversal process (maybe similar to join?). In order to improve
> the efficiency of  this behavior, you want to take an asynchronous
> approach. That is, if you modify the state of different keys, do not block
> each other due to external communication.
> If I understand correctly, according to the existing function of
> KeyedBroadcastProcessFunction, it is really impossible.
> As for whether there are other solutions, it may depend on specific
> scenarios, such as what kind of external system. So could you describe in
> detail what scenario has this requirement, and what are the external
> systems it depends on?
>
> Best,
> Guowei
>
>
> On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana <vi...@moengage.com>
> wrote:
>
>> Hello,
>> My application has a stateful operator which leverages RocksDB to store a
>> large amount of state. It, along with other operators receive configuration
>> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
>> upon another input stream that triggers some communication with external
>> services whose results are then combined to yield the state that gets
>> stored in RocksDB.
>>
>> In order to make the application more efficient, I am going to switch to
>> asynchronous IO but as the result is ultimately going to be a (Scala)
>> Future, I will have to block once to get the result. I was hoping to
>> leverage the Async IO operator but that apparently doesn't support RocksDB
>> based state storage. Am I correct in saying
>> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
>> want to understand how registering a future's callbacks (via onComplete)
>> works with a synchronous operator such as KeyedBroadcastProcessFunction.
>> Will the thread executing the function simply relinquish control to some
>> other subtask while the results of the external services are being awaited?
>> Will the callback eventually be triggered automatically or will I have to
>> explicitly block on the result future like so: Await.result(f, timeout).
>>
>> --
>> Regards,
>> Vishal
>>
>

-- 
Regards,
Vishal

Re: Broadcast State + Stateful Operator + Async IO

Posted by Guowei Ma <gu...@gmail.com>.
Hi Vishal

I want to understand your needs first. Your requirements are: After a
stateful operator receives a notification, it needs to traverse all the
data stored in the operator state, communicate with an external system
during the traversal process (maybe similar to join?). In order to improve
the efficiency of  this behavior, you want to take an asynchronous
approach. That is, if you modify the state of different keys, do not block
each other due to external communication.
If I understand correctly, according to the existing function of
KeyedBroadcastProcessFunction, it is really impossible.
As for whether there are other solutions, it may depend on specific
scenarios, such as what kind of external system. So could you describe in
detail what scenario has this requirement, and what are the external
systems it depends on?

Best,
Guowei


On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana <vi...@moengage.com> wrote:

> Hello,
> My application has a stateful operator which leverages RocksDB to store a
> large amount of state. It, along with other operators receive configuration
> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
> upon another input stream that triggers some communication with external
> services whose results are then combined to yield the state that gets
> stored in RocksDB.
>
> In order to make the application more efficient, I am going to switch to
> asynchronous IO but as the result is ultimately going to be a (Scala)
> Future, I will have to block once to get the result. I was hoping to
> leverage the Async IO operator but that apparently doesn't support RocksDB
> based state storage. Am I correct in saying
> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
> want to understand how registering a future's callbacks (via onComplete)
> works with a synchronous operator such as KeyedBroadcastProcessFunction.
> Will the thread executing the function simply relinquish control to some
> other subtask while the results of the external services are being awaited?
> Will the callback eventually be triggered automatically or will I have to
> explicitly block on the result future like so: Await.result(f, timeout).
>
> --
> Regards,
> Vishal
>