You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yingjie Cao <ke...@gmail.com> on 2021/06/07 02:11:04 UTC
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Hi devs & users,
The FLIP-148[1] has been released with Flink 1.13 and the final
implementation has some differences compared with the initial proposal in
the FLIP document. To avoid potential misunderstandings, I have updated the
FLIP document[1] accordingly and I also drafted another document[2] which
contains more implementation details. FYI.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
[2]
https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
Best,
Yingjie
Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
> 1. *Stability*: For high parallelism (tens of thousands) batch job,
> current hash-based blocking shuffle implementation writes too many files
> concurrently which gives high pressure to the file system, for example,
> maintenance of too many file metas, exhaustion of inodes or file
> descriptors. All of these can be potential stability issues. Sort-Merge
> based blocking shuffle don’t have the problem because for one result
> partition, only one file is written at the same time.
> 2. *Performance*: Large amounts of small shuffle files and random IO
> can influence shuffle performance a lot especially for hdd (for ssd,
> sequential read is also important because of read ahead and cache). For
> batch jobs processing massive data, small amount of data per subpartition
> is common because of high parallelism. Besides, data skew is another cause
> of small subpartition files. By merging data of all subpartitions together
> in one file, more sequential read can be achieved.
> 3. *Resource*: For current hash-based implementation, each
> subpartition needs at least one buffer. For large scale batch shuffles, the
> memory consumption can be huge. For example, we need at least 320M network
> memory per result partition if parallelism is set to 10000 and because of
> the huge network consumption, it is hard to config the network memory for
> large scale batch job and sometimes parallelism can not be increased just
> because of insufficient network memory which leads to bad user experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Till Rohrmann <tr...@apache.org>.
Great :-)
On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao <ke...@gmail.com> wrote:
> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details. FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >> 1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >> current hash-based blocking shuffle implementation writes too many
>>> files
>>> >> concurrently which gives high pressure to the file system, for
>>> example,
>>> >> maintenance of too many file metas, exhaustion of inodes or file
>>> >> descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >> based blocking shuffle don’t have the problem because for one
>>> result
>>> >> partition, only one file is written at the same time.
>>> >> 2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >> can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >> sequential read is also important because of read ahead and
>>> cache). For
>>> >> batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >> is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >> of small subpartition files. By merging data of all subpartitions
>>> together
>>> >> in one file, more sequential read can be achieved.
>>> >> 3. *Resource*: For current hash-based implementation, each
>>> >> subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >> memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >> memory per result partition if parallelism is set to 10000 and
>>> because of
>>> >> the huge network consumption, it is hard to config the network
>>> memory for
>>> >> large scale batch job and sometimes parallelism can not be
>>> increased just
>>> >> because of insufficient network memory which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Till Rohrmann <tr...@apache.org>.
Great :-)
On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao <ke...@gmail.com> wrote:
> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details. FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >> 1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >> current hash-based blocking shuffle implementation writes too many
>>> files
>>> >> concurrently which gives high pressure to the file system, for
>>> example,
>>> >> maintenance of too many file metas, exhaustion of inodes or file
>>> >> descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >> based blocking shuffle don’t have the problem because for one
>>> result
>>> >> partition, only one file is written at the same time.
>>> >> 2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >> can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >> sequential read is also important because of read ahead and
>>> cache). For
>>> >> batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >> is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >> of small subpartition files. By merging data of all subpartitions
>>> together
>>> >> in one file, more sequential read can be achieved.
>>> >> 3. *Resource*: For current hash-based implementation, each
>>> >> subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >> memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >> memory per result partition if parallelism is set to 10000 and
>>> because of
>>> >> the huge network consumption, it is hard to config the network
>>> memory for
>>> >> large scale batch job and sometimes parallelism can not be
>>> increased just
>>> >> because of insufficient network memory which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Yingjie Cao <ke...@gmail.com>.
Hi Till,
Thanks for the suggestion. The blog post is already on the way.
Best,
Yingjie
Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:
> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details. FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >> 1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >> current hash-based blocking shuffle implementation writes too many
>> files
>> >> concurrently which gives high pressure to the file system, for
>> example,
>> >> maintenance of too many file metas, exhaustion of inodes or file
>> >> descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >> based blocking shuffle don’t have the problem because for one result
>> >> partition, only one file is written at the same time.
>> >> 2. *Performance*: Large amounts of small shuffle files and random IO
>> >> can influence shuffle performance a lot especially for hdd (for ssd,
>> >> sequential read is also important because of read ahead and cache).
>> For
>> >> batch jobs processing massive data, small amount of data per
>> subpartition
>> >> is common because of high parallelism. Besides, data skew is
>> another cause
>> >> of small subpartition files. By merging data of all subpartitions
>> together
>> >> in one file, more sequential read can be achieved.
>> >> 3. *Resource*: For current hash-based implementation, each
>> >> subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >> memory consumption can be huge. For example, we need at least 320M
>> network
>> >> memory per result partition if parallelism is set to 10000 and
>> because of
>> >> the huge network consumption, it is hard to config the network
>> memory for
>> >> large scale batch job and sometimes parallelism can not be
>> increased just
>> >> because of insufficient network memory which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Yingjie Cao <ke...@gmail.com>.
Hi Till,
Thanks for the suggestion. The blog post is already on the way.
Best,
Yingjie
Till Rohrmann <tr...@apache.org> 于2021年6月8日周二 下午5:30写道:
> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details. FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >> 1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >> current hash-based blocking shuffle implementation writes too many
>> files
>> >> concurrently which gives high pressure to the file system, for
>> example,
>> >> maintenance of too many file metas, exhaustion of inodes or file
>> >> descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >> based blocking shuffle don’t have the problem because for one result
>> >> partition, only one file is written at the same time.
>> >> 2. *Performance*: Large amounts of small shuffle files and random IO
>> >> can influence shuffle performance a lot especially for hdd (for ssd,
>> >> sequential read is also important because of read ahead and cache).
>> For
>> >> batch jobs processing massive data, small amount of data per
>> subpartition
>> >> is common because of high parallelism. Besides, data skew is
>> another cause
>> >> of small subpartition files. By merging data of all subpartitions
>> together
>> >> in one file, more sequential read can be achieved.
>> >> 3. *Resource*: For current hash-based implementation, each
>> >> subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >> memory consumption can be huge. For example, we need at least 320M
>> network
>> >> memory per result partition if parallelism is set to 10000 and
>> because of
>> >> the huge network consumption, it is hard to config the network
>> memory for
>> >> large scale batch job and sometimes parallelism can not be
>> increased just
>> >> because of insufficient network memory which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.
Cheers,
Till
On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:
> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details. FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >> 1. *Stability*: For high parallelism (tens of thousands) batch job,
> >> current hash-based blocking shuffle implementation writes too many
> files
> >> concurrently which gives high pressure to the file system, for
> example,
> >> maintenance of too many file metas, exhaustion of inodes or file
> >> descriptors. All of these can be potential stability issues.
> Sort-Merge
> >> based blocking shuffle don’t have the problem because for one result
> >> partition, only one file is written at the same time.
> >> 2. *Performance*: Large amounts of small shuffle files and random IO
> >> can influence shuffle performance a lot especially for hdd (for ssd,
> >> sequential read is also important because of read ahead and cache).
> For
> >> batch jobs processing massive data, small amount of data per
> subpartition
> >> is common because of high parallelism. Besides, data skew is another
> cause
> >> of small subpartition files. By merging data of all subpartitions
> together
> >> in one file, more sequential read can be achieved.
> >> 3. *Resource*: For current hash-based implementation, each
> >> subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >> memory consumption can be huge. For example, we need at least 320M
> network
> >> memory per result partition if parallelism is set to 10000 and
> because of
> >> the huge network consumption, it is hard to config the network
> memory for
> >> large scale batch job and sometimes parallelism can not be
> increased just
> >> because of insufficient network memory which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.
Cheers,
Till
On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <ji...@gmail.com> wrote:
> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com>
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details. FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >> 1. *Stability*: For high parallelism (tens of thousands) batch job,
> >> current hash-based blocking shuffle implementation writes too many
> files
> >> concurrently which gives high pressure to the file system, for
> example,
> >> maintenance of too many file metas, exhaustion of inodes or file
> >> descriptors. All of these can be potential stability issues.
> Sort-Merge
> >> based blocking shuffle don’t have the problem because for one result
> >> partition, only one file is written at the same time.
> >> 2. *Performance*: Large amounts of small shuffle files and random IO
> >> can influence shuffle performance a lot especially for hdd (for ssd,
> >> sequential read is also important because of read ahead and cache).
> For
> >> batch jobs processing massive data, small amount of data per
> subpartition
> >> is common because of high parallelism. Besides, data skew is another
> cause
> >> of small subpartition files. By merging data of all subpartitions
> together
> >> in one file, more sequential read can be achieved.
> >> 3. *Resource*: For current hash-based implementation, each
> >> subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >> memory consumption can be huge. For example, we need at least 320M
> network
> >> memory per result partition if parallelism is set to 10000 and
> because of
> >> the huge network consumption, it is hard to config the network
> memory for
> >> large scale batch job and sometimes parallelism can not be
> increased just
> >> because of insufficient network memory which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Jingsong Li <ji...@gmail.com>.
Thanks Yingjie for the great effort!
This is really helpful to Flink Batch users!
Best,
Jingsong
On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com> wrote:
> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details. FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>> 1. *Stability*: For high parallelism (tens of thousands) batch job,
>> current hash-based blocking shuffle implementation writes too many files
>> concurrently which gives high pressure to the file system, for example,
>> maintenance of too many file metas, exhaustion of inodes or file
>> descriptors. All of these can be potential stability issues. Sort-Merge
>> based blocking shuffle don’t have the problem because for one result
>> partition, only one file is written at the same time.
>> 2. *Performance*: Large amounts of small shuffle files and random IO
>> can influence shuffle performance a lot especially for hdd (for ssd,
>> sequential read is also important because of read ahead and cache). For
>> batch jobs processing massive data, small amount of data per subpartition
>> is common because of high parallelism. Besides, data skew is another cause
>> of small subpartition files. By merging data of all subpartitions together
>> in one file, more sequential read can be achieved.
>> 3. *Resource*: For current hash-based implementation, each
>> subpartition needs at least one buffer. For large scale batch shuffles, the
>> memory consumption can be huge. For example, we need at least 320M network
>> memory per result partition if parallelism is set to 10000 and because of
>> the huge network consumption, it is hard to config the network memory for
>> large scale batch job and sometimes parallelism can not be increased just
>> because of insufficient network memory which leads to bad user experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>
--
Best, Jingsong Lee
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle
to Flink
Posted by Jingsong Li <ji...@gmail.com>.
Thanks Yingjie for the great effort!
This is really helpful to Flink Batch users!
Best,
Jingsong
On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <ke...@gmail.com> wrote:
> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details. FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <ke...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>> 1. *Stability*: For high parallelism (tens of thousands) batch job,
>> current hash-based blocking shuffle implementation writes too many files
>> concurrently which gives high pressure to the file system, for example,
>> maintenance of too many file metas, exhaustion of inodes or file
>> descriptors. All of these can be potential stability issues. Sort-Merge
>> based blocking shuffle don’t have the problem because for one result
>> partition, only one file is written at the same time.
>> 2. *Performance*: Large amounts of small shuffle files and random IO
>> can influence shuffle performance a lot especially for hdd (for ssd,
>> sequential read is also important because of read ahead and cache). For
>> batch jobs processing massive data, small amount of data per subpartition
>> is common because of high parallelism. Besides, data skew is another cause
>> of small subpartition files. By merging data of all subpartitions together
>> in one file, more sequential read can be achieved.
>> 3. *Resource*: For current hash-based implementation, each
>> subpartition needs at least one buffer. For large scale batch shuffles, the
>> memory consumption can be huge. For example, we need at least 320M network
>> memory per result partition if parallelism is set to 10000 and because of
>> the huge network consumption, it is hard to config the network memory for
>> large scale batch job and sometimes parallelism can not be increased just
>> because of insufficient network memory which leads to bad user experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>
--
Best, Jingsong Lee