You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Tang (Jira)" <ji...@apache.org> on 2020/12/10 12:25:00 UTC

[jira] [Commented] (FLINK-19911) add read buffer for input stream

    [ https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247209#comment-17247209 ] 

Yun Tang commented on FLINK-19911:
----------------------------------

Since previous PR has been closed after more guys involved to give more insights. [~fanrui] could you share your ideas with a brief design doc here to discuss what approach is the correct way to improve restoring performance? 

> add read buffer for input stream
> --------------------------------
>
>                 Key: FLINK-19911
>                 URL: https://issues.apache.org/jira/browse/FLINK-19911
>             Project: Flink
>          Issue Type: Improvement
>          Components: FileSystems, Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.12.0, 1.11.3, 1.13.0
>         Environment: Flink version: 1.10
> StateBackend : FsStateBackend 
> code: Flink SQL count(distinct userId)
> uv: 10 million
> State size: 200M
> TM total memory: 16G
> Parallelism: 1
>            Reporter: fanrui
>            Assignee: fanrui
>            Priority: Major
>              Labels: pull-request-available
>
> Heap StateBackend needs to serialize each Java Object into the file system during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to read kvs from RocksDB and write them to the file system in the snapshot.
> The above two cases involve a lot of small io, not large io, frequent small io is not friendly to disk. Therefore, the buffer is used in the checkpoint snapshot writing process of the file system. For details, refer to the buffer of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.
> There will be many small IOs in the restore process, but restore does not have a buffer. So I added a buffer and tested it based on Flink job.
> h2. Flink Job environment:
> {code:java}
>  Flink version: 1.10
>  StateBackend : FsStateBackend 
>  code: Flink SQL count(distinct userId)
>  uv: 10 million
>  State size: 200M
>  TM total memory: 16G
>  Parallelism: 1{code}
> It takes 33.1s to restore without read buffer, and 12.8s to restore with read buffer.
> h2. How to do it?
> Use FSDataBufferedInputStream to wrap fsDataInputStream in HeapRestoreOperation#restore,code:
> {code:java}
> FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
> FSDataInputStream bufferedInputStream = new FSDataBufferedInputStream(fsDataInputStream);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)