You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yingjie Cao <ke...@gmail.com> on 2021/06/07 02:11:04 UTC

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final
implementation has some differences compared with the initial proposal in
the FLIP document. To avoid potential misunderstandings, I have updated the
FLIP document[1] accordingly and I also drafted another document[2] which
contains more implementation details.  FYI.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
[2]
https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing

Best,
Yingjie

Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>    current hash-based blocking shuffle implementation writes too many files
>    concurrently which gives high pressure to the file system, for example,
>    maintenance of too many file metas, exhaustion of inodes or file
>    descriptors. All of these can be potential stability issues. Sort-Merge
>    based blocking shuffle don’t have the problem because for one result
>    partition, only one file is written at the same time.
>    2. *Performance*: Large amounts of small shuffle files and random IO
>    can influence shuffle performance a lot especially for hdd (for ssd,
>    sequential read is also important because of read ahead and cache). For
>    batch jobs processing massive data, small amount of data per subpartition
>    is common because of high parallelism. Besides, data skew is another cause
>    of small subpartition files. By merging data of all subpartitions together
>    in one file, more sequential read can be achieved.
>    3. *Resource*: For current hash-based implementation, each
>    subpartition needs at least one buffer. For large scale batch shuffles, the
>    memory consumption can be huge. For example, we need at least 320M network
>    memory per result partition if parallelism is set to 10000 and because of
>    the huge network consumption, it is hard to config the network memory for
>    large scale batch job and  sometimes parallelism can not be increased just
>    because of insufficient network memory  which leads to bad user experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Till Rohrmann <tr...@apache.org>.
Great :-)

On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao <ke...@gmail.com> wrote:

> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details.  FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >>    current hash-based blocking shuffle implementation writes too many
>>> files
>>> >>    concurrently which gives high pressure to the file system, for
>>> example,
>>> >>    maintenance of too many file metas, exhaustion of inodes or file
>>> >>    descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >>    based blocking shuffle don’t have the problem because for one
>>> result
>>> >>    partition, only one file is written at the same time.
>>> >>    2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >>    can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >>    sequential read is also important because of read ahead and
>>> cache). For
>>> >>    batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >>    is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >>    of small subpartition files. By merging data of all subpartitions
>>> together
>>> >>    in one file, more sequential read can be achieved.
>>> >>    3. *Resource*: For current hash-based implementation, each
>>> >>    subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >>    memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >>    memory per result partition if parallelism is set to 10000 and
>>> because of
>>> >>    the huge network consumption, it is hard to config the network
>>> memory for
>>> >>    large scale batch job and  sometimes parallelism can not be
>>> increased just
>>> >>    because of insufficient network memory  which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Till Rohrmann <tr...@apache.org>.
Great :-)

On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao <ke...@gmail.com> wrote:

> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details.  FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >>    current hash-based blocking shuffle implementation writes too many
>>> files
>>> >>    concurrently which gives high pressure to the file system, for
>>> example,
>>> >>    maintenance of too many file metas, exhaustion of inodes or file
>>> >>    descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >>    based blocking shuffle don’t have the problem because for one
>>> result
>>> >>    partition, only one file is written at the same time.
>>> >>    2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >>    can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >>    sequential read is also important because of read ahead and
>>> cache). For
>>> >>    batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >>    is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >>    of small subpartition files. By merging data of all subpartitions
>>> together
>>> >>    in one file, more sequential read can be achieved.
>>> >>    3. *Resource*: For current hash-based implementation, each
>>> >>    subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >>    memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >>    memory per result partition if parallelism is set to 10000 and
>>> because of
>>> >>    the huge network consumption, it is hard to config the network
>>> memory for
>>> >>    large scale batch job and  sometimes parallelism can not be
>>> increased just
>>> >>    because of insufficient network memory  which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Yingjie Cao <ke...@gmail.com>.
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:

> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details.  FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >>    current hash-based blocking shuffle implementation writes too many
>> files
>> >>    concurrently which gives high pressure to the file system, for
>> example,
>> >>    maintenance of too many file metas, exhaustion of inodes or file
>> >>    descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >>    based blocking shuffle don’t have the problem because for one result
>> >>    partition, only one file is written at the same time.
>> >>    2. *Performance*: Large amounts of small shuffle files and random IO
>> >>    can influence shuffle performance a lot especially for hdd (for ssd,
>> >>    sequential read is also important because of read ahead and cache).
>> For
>> >>    batch jobs processing massive data, small amount of data per
>> subpartition
>> >>    is common because of high parallelism. Besides, data skew is
>> another cause
>> >>    of small subpartition files. By merging data of all subpartitions
>> together
>> >>    in one file, more sequential read can be achieved.
>> >>    3. *Resource*: For current hash-based implementation, each
>> >>    subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >>    memory consumption can be huge. For example, we need at least 320M
>> network
>> >>    memory per result partition if parallelism is set to 10000 and
>> because of
>> >>    the huge network consumption, it is hard to config the network
>> memory for
>> >>    large scale batch job and  sometimes parallelism can not be
>> increased just
>> >>    because of insufficient network memory  which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Yingjie Cao <ke...@gmail.com>.
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:

> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details.  FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >>    current hash-based blocking shuffle implementation writes too many
>> files
>> >>    concurrently which gives high pressure to the file system, for
>> example,
>> >>    maintenance of too many file metas, exhaustion of inodes or file
>> >>    descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >>    based blocking shuffle don’t have the problem because for one result
>> >>    partition, only one file is written at the same time.
>> >>    2. *Performance*: Large amounts of small shuffle files and random IO
>> >>    can influence shuffle performance a lot especially for hdd (for ssd,
>> >>    sequential read is also important because of read ahead and cache).
>> For
>> >>    batch jobs processing massive data, small amount of data per
>> subpartition
>> >>    is common because of high parallelism. Besides, data skew is
>> another cause
>> >>    of small subpartition files. By merging data of all subpartitions
>> together
>> >>    in one file, more sequential read can be achieved.
>> >>    3. *Resource*: For current hash-based implementation, each
>> >>    subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >>    memory consumption can be huge. For example, we need at least 320M
>> network
>> >>    memory per result partition if parallelism is set to 10000 and
>> because of
>> >>    the huge network consumption, it is hard to config the network
>> memory for
>> >>    large scale batch job and  sometimes parallelism can not be
>> increased just
>> >>    because of insufficient network memory  which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:

> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details.  FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >>    1. *Stability*: For high parallelism (tens of thousands) batch job,
> >>    current hash-based blocking shuffle implementation writes too many
> files
> >>    concurrently which gives high pressure to the file system, for
> example,
> >>    maintenance of too many file metas, exhaustion of inodes or file
> >>    descriptors. All of these can be potential stability issues.
> Sort-Merge
> >>    based blocking shuffle don’t have the problem because for one result
> >>    partition, only one file is written at the same time.
> >>    2. *Performance*: Large amounts of small shuffle files and random IO
> >>    can influence shuffle performance a lot especially for hdd (for ssd,
> >>    sequential read is also important because of read ahead and cache).
> For
> >>    batch jobs processing massive data, small amount of data per
> subpartition
> >>    is common because of high parallelism. Besides, data skew is another
> cause
> >>    of small subpartition files. By merging data of all subpartitions
> together
> >>    in one file, more sequential read can be achieved.
> >>    3. *Resource*: For current hash-based implementation, each
> >>    subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >>    memory consumption can be huge. For example, we need at least 320M
> network
> >>    memory per result partition if parallelism is set to 10000 and
> because of
> >>    the huge network consumption, it is hard to config the network
> memory for
> >>    large scale batch job and  sometimes parallelism can not be
> increased just
> >>    because of insufficient network memory  which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:

> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details.  FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >>    1. *Stability*: For high parallelism (tens of thousands) batch job,
> >>    current hash-based blocking shuffle implementation writes too many
> files
> >>    concurrently which gives high pressure to the file system, for
> example,
> >>    maintenance of too many file metas, exhaustion of inodes or file
> >>    descriptors. All of these can be potential stability issues.
> Sort-Merge
> >>    based blocking shuffle don’t have the problem because for one result
> >>    partition, only one file is written at the same time.
> >>    2. *Performance*: Large amounts of small shuffle files and random IO
> >>    can influence shuffle performance a lot especially for hdd (for ssd,
> >>    sequential read is also important because of read ahead and cache).
> For
> >>    batch jobs processing massive data, small amount of data per
> subpartition
> >>    is common because of high parallelism. Besides, data skew is another
> cause
> >>    of small subpartition files. By merging data of all subpartitions
> together
> >>    in one file, more sequential read can be achieved.
> >>    3. *Resource*: For current hash-based implementation, each
> >>    subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >>    memory consumption can be huge. For example, we need at least 320M
> network
> >>    memory per result partition if parallelism is set to 10000 and
> because of
> >>    the huge network consumption, it is hard to config the network
> memory for
> >>    large scale batch job and  sometimes parallelism can not be
> increased just
> >>    because of insufficient network memory  which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com> wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>    current hash-based blocking shuffle implementation writes too many files
>>    concurrently which gives high pressure to the file system, for example,
>>    maintenance of too many file metas, exhaustion of inodes or file
>>    descriptors. All of these can be potential stability issues. Sort-Merge
>>    based blocking shuffle don’t have the problem because for one result
>>    partition, only one file is written at the same time.
>>    2. *Performance*: Large amounts of small shuffle files and random IO
>>    can influence shuffle performance a lot especially for hdd (for ssd,
>>    sequential read is also important because of read ahead and cache). For
>>    batch jobs processing massive data, small amount of data per subpartition
>>    is common because of high parallelism. Besides, data skew is another cause
>>    of small subpartition files. By merging data of all subpartitions together
>>    in one file, more sequential read can be achieved.
>>    3. *Resource*: For current hash-based implementation, each
>>    subpartition needs at least one buffer. For large scale batch shuffles, the
>>    memory consumption can be huge. For example, we need at least 320M network
>>    memory per result partition if parallelism is set to 10000 and because of
>>    the huge network consumption, it is hard to config the network memory for
>>    large scale batch job and  sometimes parallelism can not be increased just
>>    because of insufficient network memory  which leads to bad user experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com> wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>    current hash-based blocking shuffle implementation writes too many files
>>    concurrently which gives high pressure to the file system, for example,
>>    maintenance of too many file metas, exhaustion of inodes or file
>>    descriptors. All of these can be potential stability issues. Sort-Merge
>>    based blocking shuffle don’t have the problem because for one result
>>    partition, only one file is written at the same time.
>>    2. *Performance*: Large amounts of small shuffle files and random IO
>>    can influence shuffle performance a lot especially for hdd (for ssd,
>>    sequential read is also important because of read ahead and cache). For
>>    batch jobs processing massive data, small amount of data per subpartition
>>    is common because of high parallelism. Besides, data skew is another cause
>>    of small subpartition files. By merging data of all subpartitions together
>>    in one file, more sequential read can be achieved.
>>    3. *Resource*: For current hash-based implementation, each
>>    subpartition needs at least one buffer. For large scale batch shuffles, the
>>    memory consumption can be huge. For example, we need at least 320M network
>>    memory per result partition if parallelism is set to 10000 and because of
>>    the huge network consumption, it is hard to config the network memory for
>>    large scale batch job and  sometimes parallelism can not be increased just
>>    because of insufficient network memory  which leads to bad user experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

-- 
Best, Jingsong Lee