You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Igal Shilman (Jira)" <ji...@apache.org> on 2020/02/23 11:48:00 UTC

[jira] [Updated] (FLINK-16244) Add Asynchronous operations to state lazily.

     [ https://issues.apache.org/jira/browse/FLINK-16244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Igal Shilman updated FLINK-16244:
---------------------------------
    Description: 
Currently AsyncSink would add eagerly a registered async operation. 
An alternative approach would be to keep the async operations in an in memory map and only 
write them to the underlying map on snapshotState().
The rational behind this approach is the assumption that most async operations complete between two consecutive checkpoints, and therefore adding and removing them from the underlying state backend (rocksdb by default) is wasteful.

An implementation outline suggestion:

1. Add a LazyAsyncOperations class that keeps both an in memory map
and a MapStateHandle
this map would support add(), remove() and also flush()

2. Use that class in AsyncSink and in AsyncMessageDecorator

3. call flush() from FunctionGroupOperator#snapshotState()

note that a special care should be taken in flash() as the current key needs to be set
in the keyedStateBackend.

  was:
Currently AsyncSink would add eagerly a registered async operation. 
An alternative approach would be to keep the async operations in an in memory map and only 
write them to the underlying map on snapshotState().
The rational behind this approach is the assumption that most async operations complete between two consecutive checkpoints, and therefore adding and removing them from the underlying state backend (rocksdb by default) is wasteful.

An implementation outline suggestion:

1. Add a LazyAsyncOperations class that keeps both an in memory map
and a MapStateHandle
this map would support add() and remove() and also flush()

2. Use that class in AsyncSink and in AsyncMessageDecorator

3. call flush() from FunctionGroupOperator#snapshotState()

note that a special care should be taken in flash() as the current key needs to be set
in the keyedStateBackend.


> Add Asynchronous operations to state lazily.
> --------------------------------------------
>
>                 Key: FLINK-16244
>                 URL: https://issues.apache.org/jira/browse/FLINK-16244
>             Project: Flink
>          Issue Type: Task
>          Components: Stateful Functions
>            Reporter: Igal Shilman
>            Priority: Major
>
> Currently AsyncSink would add eagerly a registered async operation. 
> An alternative approach would be to keep the async operations in an in memory map and only 
> write them to the underlying map on snapshotState().
> The rational behind this approach is the assumption that most async operations complete between two consecutive checkpoints, and therefore adding and removing them from the underlying state backend (rocksdb by default) is wasteful.
> An implementation outline suggestion:
> 1. Add a LazyAsyncOperations class that keeps both an in memory map
> and a MapStateHandle
> this map would support add(), remove() and also flush()
> 2. Use that class in AsyncSink and in AsyncMessageDecorator
> 3. call flush() from FunctionGroupOperator#snapshotState()
> note that a special care should be taken in flash() as the current key needs to be set
> in the keyedStateBackend.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)