You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2019/11/20 02:11:00 UTC

[jira] [Comment Edited] (FLINK-14845) Introduce data compression to blocking shuffle.

    [ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16977096#comment-16977096 ] 

Jingsong Lee edited comment on FLINK-14845 at 11/20/19 2:10 AM:
----------------------------------------------------------------

Big +1 for this feature.

There will be a lot of shuffles in the current batch scene, that is, there will be a lot of spilling IOs.

For example, in the simplest join scenario, a large table and a small table join can filter out a lot of data. The data scale of a small table is very small. We can use broadcast join, but because there is no chain for TwoInputOperator at present, the large table still needs to go through the forward to cause spilling data to disk.

Now that there is no TwoInputOperatorChain, shuffle is the performance killer of the whole job. Disk IO is the performance bottleneck in many scenario.


was (Author: lzljs3620320):
Big +1 for this feature.

There will be a lot of shuffles in the current batch scene, that is, there will be a lot of spilling IOs.

For example, in the simplest join scenario, a large table and a small table join can filter out a lot of data. The data scale of a small table is very small. We can use broadcast join, but because there is no chain for TwoInputOperator at present, the large table still needs to go through the pipeline to cause spilling data to disk.

Now that there is no TwoInputOperatorChain, shuffle is the performance killer of the whole job. Disk IO is the performance bottleneck in many scenario.

> Introduce data compression to blocking shuffle.
> -----------------------------------------------
>
>                 Key: FLINK-14845
>                 URL: https://issues.apache.org/jira/browse/FLINK-14845
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Network
>            Reporter: Yingjie Cao
>            Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without compression. For IO bounded scenario, this can be optimized by compressing the output data. It is better to introduce a compression mechanism and offer users a config option to let the user decide whether to compress the shuffle data. Actually, we hava implemented compression in our inner Flink version and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if the data was compressed, that is, the output may be a mixture of compressed and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)