You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by weijie guo <gu...@gmail.com> on 2022/05/19 06:30:38 UTC

[DISCUSS] FLIP-235: Hybrid Shuffle Mode

Hi all,

I’d like to start a discussion about FLIP-235[1], which introduce a
new shuffle mode
 can overcome some of the problems of Pipelined Shuffle and Blocking
Shuffle in batch scenarios.

Currently in Flink, task scheduling is more or less constrained by the
shuffle implementations.
This will bring the following disadvantages:

   1. Pipelined Shuffle:
    For pipelined shuffle, the upstream and downstream tasks are
required to be deployed at the same time, to avoid upstream tasks
being blocked forever. This is fine when there are enough resources
for both upstream and downstream tasks to run simultaneously, but will
cause the following problems otherwise:
   1.
      Pipelined shuffle connected tasks (i.e., a pipelined region)
cannot be executed until obtaining resources for all of them,
resulting in longer job finishing time and poorer resource efficiency
due to holding part of the resources idle while waiting for the rest.
      2.
      More severely, if multiple jobs each hold part of the cluster
resources and are waiting for more, a deadlock would occur. The chance
is not trivial, especially for scenarios such as OLAP where concurrent
job submissions are frequent.
      2. Blocking Shuffle:
    For blocking shuffle, execution of downstream tasks must wait for
all upstream tasks to finish, despite there might be more resources
available. The sequential execution of upstream and downstream tasks
significantly increase the job finishing time, and the disk IO
workload for spilling and loading full intermediate data also affects
the performance.

We believe the root cause of the above problems is that shuffle
implementations put unnecessary constraints on task scheduling.

To solve this problem, Xintong Song and I propose to introduce hybrid
shuffle to minimize the scheduling constraints. With Hybrid Shuffle,
Flink should:

   1. Make best use of available resources.
    Ideally, we want Flink to always make progress if possible. That
is to say, it should always execute a pending task if there are
resources available for that task.
   2. Minimize disk IO load.
    In-flight data should be consumed directly from memory as much as
possible. Only data that is not consumed timely should be spilled to
disk.

You can find more details in FLIP-235. Looking forward to your feedback.


[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode



Best regards,

Weijie

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by weijie guo <gu...@gmail.com>.
Hi All,

Thanks for all the feedback about this FLIP.

Since there are no other concerns, this FLIP-235 discussion is over.  I will
open a vote today.

Best regards,

Weijie


Xintong Song <to...@gmail.com> 于2022年5月25日周三 22:17写道:

> Ok, I think we are on the same page. I'm aware of
> ExecutionConfig#setExecutionMode, which sets the data exchanging mode at
> the job scope.
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
> > You can influence it to some extent via ExecutionConfig#setExecutionMode.
> > You can for example for all shuffles to use blocking exchanges.
> >
> > I'm not proposing an API that would allow this to be set per edge.
> >
> > On 25/05/2022 15:23, Xintong Song wrote:
> >
> > In general, I agree with you about aiming jobs with no/few blocking
> > exchanges for fine-grained recovery. The only problem is, correct me if
> I'm
> > wrong, users currently cannot control the data exchanging mode of a
> > specific edge. I'm not aware of such APIs.
> >
> > As a first step, I'd prefer excluding this from the scope of this FLIP.
> >
> > Best,
> >
> > Xintong
> >
> >
> > On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ch...@apache.org>
> > wrote:
> >
> >> Yes; but that's also a limitation of the current fine-grained recovery.
> >>
> >> My suggestion was primarily aimed at jobs that have no/few blocking
> >> exchanges, where users would currently have to explicitly configure
> >> additional blocking exchanges to really get something out of
> >> fine-grained recovery (at the expense of e2e job duration).
> >>
> >> On 25/05/2022 14:47, Xintong Song wrote:
> >> >> Will this also allow spilling everything to disk while also
> forwarding
> >> >> data to the next task?
> >> >>
> >> > Yes, as long as the downstream task is started, this always forward
> the
> >> > data, even while spilling everything.
> >> >
> >> > This would allow us to improve fine-grained recovery by no longer
> being
> >> >> constrained to pipelined regions.
> >> >
> >> > I think it helps preventing restarts of the upstreams for a failed
> task,
> >> > but not the downstreams. Because there's no guarantee a restarted task
> >> will
> >> > prevent exactly same data (in terms of order) as the previous
> execution,
> >> > thus downstreams cannot resume consuming the data.
> >> >
> >> >
> >> > Best,
> >> >
> >> > Xintong
> >> >
> >> >
> >> >
> >> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ch...@apache.org>
> >> wrote:
> >> >
> >> >> Will this also allow spilling everything to disk while also
> forwarding
> >> >> data to the next task?
> >> >>
> >> >> This would allow us to improve fine-grained recovery by no longer
> being
> >> >> constrained to pipelined regions.
> >> >>
> >> >> On 25/05/2022 05:55, weijie guo wrote:
> >> >>> Hi All,
> >> >>> Thank you for your attention and feedback.
> >> >>> Do you have any other comments? If there are no other questions,
> I'll
> >> >> vote
> >> >>> on FLIP-235 tomorrow.
> >> >>>
> >> >>> Best regards,
> >> >>>
> >> >>> Weijie
> >> >>>
> >> >>>
> >> >>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:
> >> >>>
> >> >>>> Hi Xintong
> >> >>>>       Thanks for your detailed explanation, I misunderstand the
> spill
> >> >>>> behavior at first glance,
> >> >>>> I get your point now. I think it will be a good addition to the
> >> current
> >> >>>> execution mode.
> >> >>>> Looking forward to it :)
> >> >>>>
> >> >>>> Best,
> >> >>>> Aitozi
> >> >>>>
> >> >>>> Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:
> >> >>>>
> >> >>>>> Hi Aitozi,
> >> >>>>>
> >> >>>>> In which case we can use the hybrid shuffle mode
> >> >>>>>
> >> >>>>> Just to directly answer this question, in addition to
> >> >>>>> Weijie's explanations. For batch workload, if you want the
> workload
> >> to
> >> >>>> take
> >> >>>>> advantage of as many resources as available, which ranges from a
> >> single
> >> >>>>> slot to as many slots as the total tasks, you may consider hybrid
> >> >> shuffle
> >> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may
> not
> >> >> want
> >> >>>>> to execute a job if there's too few resources available, or may
> not
> >> >> want
> >> >>>> a
> >> >>>>> job taking too many of the cluster resources. That's why we
> propose
> >> >>>> hybrid
> >> >>>>> shuffle as an additional option for batch users, rather than a
> >> >>>> replacement
> >> >>>>> for Pipelined or Blocking mode.
> >> >>>>>
> >> >>>>> So you mean the hybrid shuffle mode will limit its usage to the
> >> bounded
> >> >>>>>> source, Right ?
> >> >>>>>>
> >> >>>>> Yes.
> >> >>>>>
> >> >>>>> One more question, with the bounded data and partly of the stage
> is
> >> >>>> running
> >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
> >> task
> >> >>>>>> failure, Is the checkpoint enabled for these running stages or
> >> will it
> >> >>>>>> re-run after the failure?
> >> >>>>>>
> >> >>>>> There's no checkpoints. The failover behavior depends on the
> >> spilling
> >> >>>>> strategy.
> >> >>>>> - In the first version, we only consider a selective spilling
> >> strategy,
> >> >>>>> which means spill data as little as possible to the disk, which
> >> means
> >> >> in
> >> >>>>> case of failover upstream tasks need to be restarted to reproduce
> >> the
> >> >>>>> complete intermediate results.
> >> >>>>> - An alternative strategy we may introduce in future if needed is
> to
> >> >>>> spill
> >> >>>>> the complete intermediate results. That avoids restarting upstream
> >> >> tasks
> >> >>>> in
> >> >>>>> case of failover, because the produced intermediate results can be
> >> >>>>> re-consumed, at the cost of more disk IO load.
> >> >>>>> With both strategies, the trade-off between failover cost and IO
> >> load
> >> >> is
> >> >>>>> for the user to decide. This is also discussed in the
> >> MemoryDataManager
> >> >>>>> section of the FLIP.
> >> >>>>>
> >> >>>>> Best,
> >> >>>>>
> >> >>>>> Xintong
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com>
> >> wrote:
> >> >>>>>
> >> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle
> mode
> >> >> will
> >> >>>>>> limit
> >> >>>>>> its usage to the bounded source, Right ?
> >> >>>>>> One more question, with the bounded data and partly of the stage
> is
> >> >>>>> running
> >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
> >> task
> >> >>>>>> failure, Is the
> >> >>>>>> checkpoint enabled for these running stages or will it re-run
> after
> >> >> the
> >> >>>>>> failure?
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> Aitozi
> >> >>>>>>
> >> >>>>>> weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
> >> >>>>>>
> >> >>>>>>> Hi, Aitozi:
> >> >>>>>>>
> >> >>>>>>> Thank you for the feedback!
> >> >>>>>>> Here are some of my thoughts on your question
> >> >>>>>>>
> >> >>>>>>>>>> 1.If there is an unbounded data source, but only have
> resource
> >> to
> >> >>>>>>> schedule the first stage, will it bring the big burden to the
> >> >>>>>> disk/shuffle
> >> >>>>>>> service which will occupy all the resource I think.
> >> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
> >> >>>>> scenario,
> >> >>>>>> so
> >> >>>>>>> there is no problem of unbounded data sources. Secondly, if you
> >> >>>>> consider
> >> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be
> the
> >> >>>> best
> >> >>>>>>> choice at present. For an unbounded data stream, it is not
> >> meaningful
> >> >>>>> to
> >> >>>>>>> only run some stages.
> >> >>>>>>>
> >> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle
> mode.
> >> >>>> In
> >> >>>>>>> other words, In which case we can use the hybrid shuffle mode:
> >> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs,
> >> hybrid
> >> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid
> >> some
> >> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
> >> >>>>> characterized
> >> >>>>>>> by a large number of concurrently submitted short batch jobs,
> >> hybrid
> >> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
> >> >>>> shuffle
> >> >>>>>> and
> >> >>>>>>> achieve similar performance.
> >> >>>>>>>
> >> >>>>>>> Best regards,
> >> >>>>>>>
> >> >>>>>>> Weijie
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
> >> >>>>>>>
> >> >>>>>>>> Hi Weijie:
> >> >>>>>>>>
> >> >>>>>>>>        Thanks for the nice FLIP, I have couple questions about
> >> this:
> >> >>>>>>>>
> >> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by
> the
> >> >>>>>>> resource.
> >> >>>>>>>> If there
> >> >>>>>>>> is an unbounded data source, but only have resource to schedule
> >> the
> >> >>>>>> first
> >> >>>>>>>> stage, will it
> >> >>>>>>>> bring the big burden to the disk/shuffle service which will
> >> occupy
> >> >>>>> all
> >> >>>>>>> the
> >> >>>>>>>> resource I think.
> >> >>>>>>>>
> >> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode.
> >> In
> >> >>>>>> other
> >> >>>>>>>> words, In which
> >> >>>>>>>> case we can use the hybrid shuffle mode:
> >> >>>>>>>> - For batch job want to use more resource to reduce the e2e
> time
> >> ?
> >> >>>>>>>> - Or for streaming job which may lack of resource temporarily ?
> >> >>>>>>>> - Or for OLAP job which will try to make best use of available
> >> >>>>>> resources
> >> >>>>>>> as
> >> >>>>>>>> you mentioned to finish the query?
> >> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle
> >> mode
> >> >>>> :)
> >> >>>>>>>> Best,
> >> >>>>>>>> Aitozi.
> >> >>>>>>>>
> >> >>>>>>>> weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
> >> >>>>>>>>
> >> >>>>>>>>> Yangze, Thank you for the feedback!
> >> >>>>>>>>> Here's my thoughts for your questions:
> >> >>>>>>>>>
> >> >>>>>>>>>>>> How do we decide the size of the buffer pool in
> >> >>>>> MemoryDataManager
> >> >>>>>>> and
> >> >>>>>>>>> the read buffers in FileDataManager?
> >> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool
> used
> >> >>>> by
> >> >>>>>>>>> ResultPartition, and the size is the same as the current
> >> >>>>>> implementation
> >> >>>>>>>> of
> >> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
> >> >>>> BufferPool
> >> >>>>>> is
> >> >>>>>>> a
> >> >>>>>>>>> configurable fixed value, and the maximum value is
> Math.max(min,
> >> >>>> 4
> >> >>>>> *
> >> >>>>>>>>> numSubpartitions). The default value can be determined by
> >> running
> >> >>>>> the
> >> >>>>>>>>> TPC-DS tests.
> >> >>>>>>>>> Read buffers in FileDataManager are requested from the
> >> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
> >> >>>>>> controlled
> >> >>>>>>> by
> >> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*,
> the
> >> >>>>>> default
> >> >>>>>>>>> value is 32M, which is consistent with the current sort-merge
> >> >>>>> shuffle
> >> >>>>>>>>> logic.
> >> >>>>>>>>>
> >> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is,
> how
> >> >>>>>> does
> >> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size
> of
> >> >>>> the
> >> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the
> Network
> >> >>>>>> Memory.
> >> >>>>>>>> The
> >> >>>>>>>>> buffers of the FileDataManager are directly requested from
> >> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
> >> >>>>>>> framework
> >> >>>>>>>>> off-heap memory. I think there should be no need for
> additional
> >> >>>>>>>>> synchronization mechanisms.
> >> >>>>>>>>>
> >> >>>>>>>>>>>> How do you disable the slot sharing? If user configures
> both
> >> >>>>> the
> >> >>>>>>> slot
> >> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that
> job?
> >> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is
> >> enabled
> >> >>>>> and
> >> >>>>>>> SSG
> >> >>>>>>>>> is configured during the JobGraph compilation stage, and
> >> fallback
> >> >>>>> to
> >> >>>>>>> the
> >> >>>>>>>>> region slot sharing group by default. Of course, it will be
> >> >>>>>> emphasized
> >> >>>>>>> in
> >> >>>>>>>>> the document that we do not currently support SSG, If
> >> configured,
> >> >>>>> it
> >> >>>>>>> will
> >> >>>>>>>>> fall back to the default.
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> Best regards,
> >> >>>>>>>>>
> >> >>>>>>>>> Weijie
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> >> >>>>>>>>>
> >> >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
> >> >>>>> engine.
> >> >>>>>> +1
> >> >>>>>>>>>> for the overall design.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Some questions:
> >> >>>>>>>>>> 1. How do we decide the size of the buffer pool in
> >> >>>>>> MemoryDataManager
> >> >>>>>>>>>> and the read buffers in FileDataManager?
> >> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
> >> >>>> how
> >> >>>>>> does
> >> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures
> both
> >> >>>>> the
> >> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to
> that
> >> >>>>>> job?
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Yangze Guo
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> >> >>>>>> tonysong820@gmail.com>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I think this is a good improvement on batch resource
> >> >>>>> elasticity.
> >> >>>>>>>>> Looking
> >> >>>>>>>>>>> forward to the community feedback.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Best,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Xintong
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
> >> >>>>>>>> guoweijiereswqa@gmail.com>
> >> >>>>>>>>>>> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>> Hi all,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
> >> >>>>>>> introduce a
> >> >>>>>>>>>> new shuffle mode
> >> >>>>>>>>>>>>    can overcome some of the problems of Pipelined Shuffle
> and
> >> >>>>>>>> Blocking
> >> >>>>>>>>>> Shuffle in batch scenarios.
> >> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
> >> >>>>> constrained
> >> >>>>>>> by
> >> >>>>>>>>> the
> >> >>>>>>>>>> shuffle implementations.
> >> >>>>>>>>>>>> This will bring the following disadvantages:
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>      1. Pipelined Shuffle:
> >> >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
> >> >>>> tasks
> >> >>>>>> are
> >> >>>>>>>>>> required to be deployed at the same time, to avoid upstream
> >> >>>> tasks
> >> >>>>>>> being
> >> >>>>>>>>>> blocked forever. This is fine when there are enough resources
> >> >>>> for
> >> >>>>>>> both
> >> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will
> >> >>>>> cause
> >> >>>>>>> the
> >> >>>>>>>>>> following problems otherwise:
> >> >>>>>>>>>>>>      1.
> >> >>>>>>>>>>>>         Pipelined shuffle connected tasks (i.e., a
> pipelined
> >> >>>>>>> region)
> >> >>>>>>>>>> cannot be executed until obtaining resources for all of them,
> >> >>>>>>> resulting
> >> >>>>>>>>> in
> >> >>>>>>>>>> longer job finishing time and poorer resource efficiency due
> to
> >> >>>>>>> holding
> >> >>>>>>>>>> part of the resources idle while waiting for the rest.
> >> >>>>>>>>>>>>         2.
> >> >>>>>>>>>>>>         More severely, if multiple jobs each hold part of
> the
> >> >>>>>>> cluster
> >> >>>>>>>>>> resources and are waiting for more, a deadlock would occur.
> The
> >> >>>>>>> chance
> >> >>>>>>>> is
> >> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
> >> >>>>> concurrent
> >> >>>>>>> job
> >> >>>>>>>>>> submissions are frequent.
> >> >>>>>>>>>>>>         2. Blocking Shuffle:
> >> >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
> >> >>>> must
> >> >>>>>> wait
> >> >>>>>>>> for
> >> >>>>>>>>>> all upstream tasks to finish, despite there might be more
> >> >>>>> resources
> >> >>>>>>>>>> available. The sequential execution of upstream and
> downstream
> >> >>>>>> tasks
> >> >>>>>>>>>> significantly increase the job finishing time, and the disk
> IO
> >> >>>>>>> workload
> >> >>>>>>>>> for
> >> >>>>>>>>>> spilling and loading full intermediate data also affects the
> >> >>>>>>>> performance.
> >> >>>>>>>>>>>> We believe the root cause of the above problems is that
> >> >>>>> shuffle
> >> >>>>>>>>>> implementations put unnecessary constraints on task
> scheduling.
> >> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
> >> >>>>> introduce
> >> >>>>>>>> hybrid
> >> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
> >> >>>>>> Shuffle,
> >> >>>>>>>>> Flink
> >> >>>>>>>>>> should:
> >> >>>>>>>>>>>>      1. Make best use of available resources.
> >> >>>>>>>>>>>>       Ideally, we want Flink to always make progress if
> >> >>>>> possible.
> >> >>>>>>>> That
> >> >>>>>>>>>> is to say, it should always execute a pending task if there
> are
> >> >>>>>>>> resources
> >> >>>>>>>>>> available for that task.
> >> >>>>>>>>>>>>      2. Minimize disk IO load.
> >> >>>>>>>>>>>>       In-flight data should be consumed directly from
> memory
> >> >>>> as
> >> >>>>>>> much
> >> >>>>>>>> as
> >> >>>>>>>>>> possible. Only data that is not consumed timely should be
> >> >>>> spilled
> >> >>>>>> to
> >> >>>>>>>>> disk.
> >> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
> >> >>>>> your
> >> >>>>>>>>>> feedback.
> >> >>>>>>>>>>>> [1]
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Weijie
> >> >>>>>>>>>>>>
> >> >>
> >>
> >>
> >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Xintong Song <to...@gmail.com>.
Ok, I think we are on the same page. I'm aware of
ExecutionConfig#setExecutionMode, which sets the data exchanging mode at
the job scope.

Best,

Xintong



On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler <ch...@apache.org> wrote:

> You can influence it to some extent via ExecutionConfig#setExecutionMode.
> You can for example for all shuffles to use blocking exchanges.
>
> I'm not proposing an API that would allow this to be set per edge.
>
> On 25/05/2022 15:23, Xintong Song wrote:
>
> In general, I agree with you about aiming jobs with no/few blocking
> exchanges for fine-grained recovery. The only problem is, correct me if I'm
> wrong, users currently cannot control the data exchanging mode of a
> specific edge. I'm not aware of such APIs.
>
> As a first step, I'd prefer excluding this from the scope of this FLIP.
>
> Best,
>
> Xintong
>
>
> On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> Yes; but that's also a limitation of the current fine-grained recovery.
>>
>> My suggestion was primarily aimed at jobs that have no/few blocking
>> exchanges, where users would currently have to explicitly configure
>> additional blocking exchanges to really get something out of
>> fine-grained recovery (at the expense of e2e job duration).
>>
>> On 25/05/2022 14:47, Xintong Song wrote:
>> >> Will this also allow spilling everything to disk while also forwarding
>> >> data to the next task?
>> >>
>> > Yes, as long as the downstream task is started, this always forward the
>> > data, even while spilling everything.
>> >
>> > This would allow us to improve fine-grained recovery by no longer being
>> >> constrained to pipelined regions.
>> >
>> > I think it helps preventing restarts of the upstreams for a failed task,
>> > but not the downstreams. Because there's no guarantee a restarted task
>> will
>> > prevent exactly same data (in terms of order) as the previous execution,
>> > thus downstreams cannot resume consuming the data.
>> >
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> >
>> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ch...@apache.org>
>> wrote:
>> >
>> >> Will this also allow spilling everything to disk while also forwarding
>> >> data to the next task?
>> >>
>> >> This would allow us to improve fine-grained recovery by no longer being
>> >> constrained to pipelined regions.
>> >>
>> >> On 25/05/2022 05:55, weijie guo wrote:
>> >>> Hi All,
>> >>> Thank you for your attention and feedback.
>> >>> Do you have any other comments? If there are no other questions, I'll
>> >> vote
>> >>> on FLIP-235 tomorrow.
>> >>>
>> >>> Best regards,
>> >>>
>> >>> Weijie
>> >>>
>> >>>
>> >>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:
>> >>>
>> >>>> Hi Xintong
>> >>>>       Thanks for your detailed explanation, I misunderstand the spill
>> >>>> behavior at first glance,
>> >>>> I get your point now. I think it will be a good addition to the
>> current
>> >>>> execution mode.
>> >>>> Looking forward to it :)
>> >>>>
>> >>>> Best,
>> >>>> Aitozi
>> >>>>
>> >>>> Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:
>> >>>>
>> >>>>> Hi Aitozi,
>> >>>>>
>> >>>>> In which case we can use the hybrid shuffle mode
>> >>>>>
>> >>>>> Just to directly answer this question, in addition to
>> >>>>> Weijie's explanations. For batch workload, if you want the workload
>> to
>> >>>> take
>> >>>>> advantage of as many resources as available, which ranges from a
>> single
>> >>>>> slot to as many slots as the total tasks, you may consider hybrid
>> >> shuffle
>> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may not
>> >> want
>> >>>>> to execute a job if there's too few resources available, or may not
>> >> want
>> >>>> a
>> >>>>> job taking too many of the cluster resources. That's why we propose
>> >>>> hybrid
>> >>>>> shuffle as an additional option for batch users, rather than a
>> >>>> replacement
>> >>>>> for Pipelined or Blocking mode.
>> >>>>>
>> >>>>> So you mean the hybrid shuffle mode will limit its usage to the
>> bounded
>> >>>>>> source, Right ?
>> >>>>>>
>> >>>>> Yes.
>> >>>>>
>> >>>>> One more question, with the bounded data and partly of the stage is
>> >>>> running
>> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
>> task
>> >>>>>> failure, Is the checkpoint enabled for these running stages or
>> will it
>> >>>>>> re-run after the failure?
>> >>>>>>
>> >>>>> There's no checkpoints. The failover behavior depends on the
>> spilling
>> >>>>> strategy.
>> >>>>> - In the first version, we only consider a selective spilling
>> strategy,
>> >>>>> which means spill data as little as possible to the disk, which
>> means
>> >> in
>> >>>>> case of failover upstream tasks need to be restarted to reproduce
>> the
>> >>>>> complete intermediate results.
>> >>>>> - An alternative strategy we may introduce in future if needed is to
>> >>>> spill
>> >>>>> the complete intermediate results. That avoids restarting upstream
>> >> tasks
>> >>>> in
>> >>>>> case of failover, because the produced intermediate results can be
>> >>>>> re-consumed, at the cost of more disk IO load.
>> >>>>> With both strategies, the trade-off between failover cost and IO
>> load
>> >> is
>> >>>>> for the user to decide. This is also discussed in the
>> MemoryDataManager
>> >>>>> section of the FLIP.
>> >>>>>
>> >>>>> Best,
>> >>>>>
>> >>>>> Xintong
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
>> >> will
>> >>>>>> limit
>> >>>>>> its usage to the bounded source, Right ?
>> >>>>>> One more question, with the bounded data and partly of the stage is
>> >>>>> running
>> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
>> task
>> >>>>>> failure, Is the
>> >>>>>> checkpoint enabled for these running stages or will it re-run after
>> >> the
>> >>>>>> failure?
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Aitozi
>> >>>>>>
>> >>>>>> weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
>> >>>>>>
>> >>>>>>> Hi, Aitozi:
>> >>>>>>>
>> >>>>>>> Thank you for the feedback!
>> >>>>>>> Here are some of my thoughts on your question
>> >>>>>>>
>> >>>>>>>>>> 1.If there is an unbounded data source, but only have resource
>> to
>> >>>>>>> schedule the first stage, will it bring the big burden to the
>> >>>>>> disk/shuffle
>> >>>>>>> service which will occupy all the resource I think.
>> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
>> >>>>> scenario,
>> >>>>>> so
>> >>>>>>> there is no problem of unbounded data sources. Secondly, if you
>> >>>>> consider
>> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be the
>> >>>> best
>> >>>>>>> choice at present. For an unbounded data stream, it is not
>> meaningful
>> >>>>> to
>> >>>>>>> only run some stages.
>> >>>>>>>
>> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
>> >>>> In
>> >>>>>>> other words, In which case we can use the hybrid shuffle mode:
>> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs,
>> hybrid
>> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid
>> some
>> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
>> >>>>> characterized
>> >>>>>>> by a large number of concurrently submitted short batch jobs,
>> hybrid
>> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
>> >>>> shuffle
>> >>>>>> and
>> >>>>>>> achieve similar performance.
>> >>>>>>>
>> >>>>>>> Best regards,
>> >>>>>>>
>> >>>>>>> Weijie
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
>> >>>>>>>
>> >>>>>>>> Hi Weijie:
>> >>>>>>>>
>> >>>>>>>>        Thanks for the nice FLIP, I have couple questions about
>> this:
>> >>>>>>>>
>> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
>> >>>>>>> resource.
>> >>>>>>>> If there
>> >>>>>>>> is an unbounded data source, but only have resource to schedule
>> the
>> >>>>>> first
>> >>>>>>>> stage, will it
>> >>>>>>>> bring the big burden to the disk/shuffle service which will
>> occupy
>> >>>>> all
>> >>>>>>> the
>> >>>>>>>> resource I think.
>> >>>>>>>>
>> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode.
>> In
>> >>>>>> other
>> >>>>>>>> words, In which
>> >>>>>>>> case we can use the hybrid shuffle mode:
>> >>>>>>>> - For batch job want to use more resource to reduce the e2e time
>> ?
>> >>>>>>>> - Or for streaming job which may lack of resource temporarily ?
>> >>>>>>>> - Or for OLAP job which will try to make best use of available
>> >>>>>> resources
>> >>>>>>> as
>> >>>>>>>> you mentioned to finish the query?
>> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle
>> mode
>> >>>> :)
>> >>>>>>>> Best,
>> >>>>>>>> Aitozi.
>> >>>>>>>>
>> >>>>>>>> weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
>> >>>>>>>>
>> >>>>>>>>> Yangze, Thank you for the feedback!
>> >>>>>>>>> Here's my thoughts for your questions:
>> >>>>>>>>>
>> >>>>>>>>>>>> How do we decide the size of the buffer pool in
>> >>>>> MemoryDataManager
>> >>>>>>> and
>> >>>>>>>>> the read buffers in FileDataManager?
>> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
>> >>>> by
>> >>>>>>>>> ResultPartition, and the size is the same as the current
>> >>>>>> implementation
>> >>>>>>>> of
>> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
>> >>>> BufferPool
>> >>>>>> is
>> >>>>>>> a
>> >>>>>>>>> configurable fixed value, and the maximum value is Math.max(min,
>> >>>> 4
>> >>>>> *
>> >>>>>>>>> numSubpartitions). The default value can be determined by
>> running
>> >>>>> the
>> >>>>>>>>> TPC-DS tests.
>> >>>>>>>>> Read buffers in FileDataManager are requested from the
>> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
>> >>>>>> controlled
>> >>>>>>> by
>> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
>> >>>>>> default
>> >>>>>>>>> value is 32M, which is consistent with the current sort-merge
>> >>>>> shuffle
>> >>>>>>>>> logic.
>> >>>>>>>>>
>> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
>> >>>>>> does
>> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size of
>> >>>> the
>> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the Network
>> >>>>>> Memory.
>> >>>>>>>> The
>> >>>>>>>>> buffers of the FileDataManager are directly requested from
>> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
>> >>>>>>> framework
>> >>>>>>>>> off-heap memory. I think there should be no need for additional
>> >>>>>>>>> synchronization mechanisms.
>> >>>>>>>>>
>> >>>>>>>>>>>> How do you disable the slot sharing? If user configures both
>> >>>>> the
>> >>>>>>> slot
>> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that job?
>> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is
>> enabled
>> >>>>> and
>> >>>>>>> SSG
>> >>>>>>>>> is configured during the JobGraph compilation stage, and
>> fallback
>> >>>>> to
>> >>>>>>> the
>> >>>>>>>>> region slot sharing group by default. Of course, it will be
>> >>>>>> emphasized
>> >>>>>>> in
>> >>>>>>>>> the document that we do not currently support SSG, If
>> configured,
>> >>>>> it
>> >>>>>>> will
>> >>>>>>>>> fall back to the default.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Best regards,
>> >>>>>>>>>
>> >>>>>>>>> Weijie
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
>> >>>>>>>>>
>> >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
>> >>>>>>>>>>
>> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
>> >>>>> engine.
>> >>>>>> +1
>> >>>>>>>>>> for the overall design.
>> >>>>>>>>>>
>> >>>>>>>>>> Some questions:
>> >>>>>>>>>> 1. How do we decide the size of the buffer pool in
>> >>>>>> MemoryDataManager
>> >>>>>>>>>> and the read buffers in FileDataManager?
>> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
>> >>>> how
>> >>>>>> does
>> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures both
>> >>>>> the
>> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
>> >>>>>> job?
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Yangze Guo
>> >>>>>>>>>>
>> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
>> >>>>>> tonysong820@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I think this is a good improvement on batch resource
>> >>>>> elasticity.
>> >>>>>>>>> Looking
>> >>>>>>>>>>> forward to the community feedback.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Xintong
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
>> >>>>>>>> guoweijiereswqa@gmail.com>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hi all,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
>> >>>>>>> introduce a
>> >>>>>>>>>> new shuffle mode
>> >>>>>>>>>>>>    can overcome some of the problems of Pipelined Shuffle and
>> >>>>>>>> Blocking
>> >>>>>>>>>> Shuffle in batch scenarios.
>> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
>> >>>>> constrained
>> >>>>>>> by
>> >>>>>>>>> the
>> >>>>>>>>>> shuffle implementations.
>> >>>>>>>>>>>> This will bring the following disadvantages:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>      1. Pipelined Shuffle:
>> >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
>> >>>> tasks
>> >>>>>> are
>> >>>>>>>>>> required to be deployed at the same time, to avoid upstream
>> >>>> tasks
>> >>>>>>> being
>> >>>>>>>>>> blocked forever. This is fine when there are enough resources
>> >>>> for
>> >>>>>>> both
>> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will
>> >>>>> cause
>> >>>>>>> the
>> >>>>>>>>>> following problems otherwise:
>> >>>>>>>>>>>>      1.
>> >>>>>>>>>>>>         Pipelined shuffle connected tasks (i.e., a pipelined
>> >>>>>>> region)
>> >>>>>>>>>> cannot be executed until obtaining resources for all of them,
>> >>>>>>> resulting
>> >>>>>>>>> in
>> >>>>>>>>>> longer job finishing time and poorer resource efficiency due to
>> >>>>>>> holding
>> >>>>>>>>>> part of the resources idle while waiting for the rest.
>> >>>>>>>>>>>>         2.
>> >>>>>>>>>>>>         More severely, if multiple jobs each hold part of the
>> >>>>>>> cluster
>> >>>>>>>>>> resources and are waiting for more, a deadlock would occur. The
>> >>>>>>> chance
>> >>>>>>>> is
>> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
>> >>>>> concurrent
>> >>>>>>> job
>> >>>>>>>>>> submissions are frequent.
>> >>>>>>>>>>>>         2. Blocking Shuffle:
>> >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
>> >>>> must
>> >>>>>> wait
>> >>>>>>>> for
>> >>>>>>>>>> all upstream tasks to finish, despite there might be more
>> >>>>> resources
>> >>>>>>>>>> available. The sequential execution of upstream and downstream
>> >>>>>> tasks
>> >>>>>>>>>> significantly increase the job finishing time, and the disk IO
>> >>>>>>> workload
>> >>>>>>>>> for
>> >>>>>>>>>> spilling and loading full intermediate data also affects the
>> >>>>>>>> performance.
>> >>>>>>>>>>>> We believe the root cause of the above problems is that
>> >>>>> shuffle
>> >>>>>>>>>> implementations put unnecessary constraints on task scheduling.
>> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
>> >>>>> introduce
>> >>>>>>>> hybrid
>> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
>> >>>>>> Shuffle,
>> >>>>>>>>> Flink
>> >>>>>>>>>> should:
>> >>>>>>>>>>>>      1. Make best use of available resources.
>> >>>>>>>>>>>>       Ideally, we want Flink to always make progress if
>> >>>>> possible.
>> >>>>>>>> That
>> >>>>>>>>>> is to say, it should always execute a pending task if there are
>> >>>>>>>> resources
>> >>>>>>>>>> available for that task.
>> >>>>>>>>>>>>      2. Minimize disk IO load.
>> >>>>>>>>>>>>       In-flight data should be consumed directly from memory
>> >>>> as
>> >>>>>>> much
>> >>>>>>>> as
>> >>>>>>>>>> possible. Only data that is not consumed timely should be
>> >>>> spilled
>> >>>>>> to
>> >>>>>>>>> disk.
>> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
>> >>>>> your
>> >>>>>>>>>> feedback.
>> >>>>>>>>>>>> [1]
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Weijie
>> >>>>>>>>>>>>
>> >>
>>
>>
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Chesnay Schepler <ch...@apache.org>.
You can influence it to some extent via ExecutionConfig#setExecutionMode.
You can for example for all shuffles to use blocking exchanges.

I'm not proposing an API that would allow this to be set per edge.

On 25/05/2022 15:23, Xintong Song wrote:
> In general, I agree with you about aiming jobs with no/few blocking 
> exchanges for fine-grained recovery. The only problem is, correct me 
> if I'm wrong, users currently cannot control the data exchanging mode 
> of a specific edge. I'm not aware of such APIs.
>
> As a first step, I'd prefer excluding this from the scope of this FLIP.
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ch...@apache.org> 
> wrote:
>
>     Yes; but that's also a limitation of the current fine-grained
>     recovery.
>
>     My suggestion was primarily aimed at jobs that have no/few blocking
>     exchanges, where users would currently have to explicitly configure
>     additional blocking exchanges to really get something out of
>     fine-grained recovery (at the expense of e2e job duration).
>
>     On 25/05/2022 14:47, Xintong Song wrote:
>     >> Will this also allow spilling everything to disk while also
>     forwarding
>     >> data to the next task?
>     >>
>     > Yes, as long as the downstream task is started, this always
>     forward the
>     > data, even while spilling everything.
>     >
>     > This would allow us to improve fine-grained recovery by no
>     longer being
>     >> constrained to pipelined regions.
>     >
>     > I think it helps preventing restarts of the upstreams for a
>     failed task,
>     > but not the downstreams. Because there's no guarantee a
>     restarted task will
>     > prevent exactly same data (in terms of order) as the previous
>     execution,
>     > thus downstreams cannot resume consuming the data.
>     >
>     >
>     > Best,
>     >
>     > Xintong
>     >
>     >
>     >
>     > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler
>     <ch...@apache.org> wrote:
>     >
>     >> Will this also allow spilling everything to disk while also
>     forwarding
>     >> data to the next task?
>     >>
>     >> This would allow us to improve fine-grained recovery by no
>     longer being
>     >> constrained to pipelined regions.
>     >>
>     >> On 25/05/2022 05:55, weijie guo wrote:
>     >>> Hi All,
>     >>> Thank you for your attention and feedback.
>     >>> Do you have any other comments? If there are no other
>     questions, I'll
>     >> vote
>     >>> on FLIP-235 tomorrow.
>     >>>
>     >>> Best regards,
>     >>>
>     >>> Weijie
>     >>>
>     >>>
>     >>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:
>     >>>
>     >>>> Hi Xintong
>     >>>>       Thanks for your detailed explanation, I misunderstand
>     the spill
>     >>>> behavior at first glance,
>     >>>> I get your point now. I think it will be a good addition to
>     the current
>     >>>> execution mode.
>     >>>> Looking forward to it :)
>     >>>>
>     >>>> Best,
>     >>>> Aitozi
>     >>>>
>     >>>> Xintong Song <to...@gmail.com> 于2022年5月20日周五
>     12:26写道:
>     >>>>
>     >>>>> Hi Aitozi,
>     >>>>>
>     >>>>> In which case we can use the hybrid shuffle mode
>     >>>>>
>     >>>>> Just to directly answer this question, in addition to
>     >>>>> Weijie's explanations. For batch workload, if you want the
>     workload to
>     >>>> take
>     >>>>> advantage of as many resources as available, which ranges
>     from a single
>     >>>>> slot to as many slots as the total tasks, you may consider
>     hybrid
>     >> shuffle
>     >>>>> mode. Admittedly, this may not always be wanted, e.g., users
>     may not
>     >> want
>     >>>>> to execute a job if there's too few resources available, or
>     may not
>     >> want
>     >>>> a
>     >>>>> job taking too many of the cluster resources. That's why we
>     propose
>     >>>> hybrid
>     >>>>> shuffle as an additional option for batch users, rather than a
>     >>>> replacement
>     >>>>> for Pipelined or Blocking mode.
>     >>>>>
>     >>>>> So you mean the hybrid shuffle mode will limit its usage to
>     the bounded
>     >>>>>> source, Right ?
>     >>>>>>
>     >>>>> Yes.
>     >>>>>
>     >>>>> One more question, with the bounded data and partly of the
>     stage is
>     >>>> running
>     >>>>>> in the Pipelined shuffle mode, what will be the behavior of
>     the task
>     >>>>>> failure, Is the checkpoint enabled for these running stages
>     or will it
>     >>>>>> re-run after the failure?
>     >>>>>>
>     >>>>> There's no checkpoints. The failover behavior depends on the
>     spilling
>     >>>>> strategy.
>     >>>>> - In the first version, we only consider a selective
>     spilling strategy,
>     >>>>> which means spill data as little as possible to the disk,
>     which means
>     >> in
>     >>>>> case of failover upstream tasks need to be restarted to
>     reproduce the
>     >>>>> complete intermediate results.
>     >>>>> - An alternative strategy we may introduce in future if
>     needed is to
>     >>>> spill
>     >>>>> the complete intermediate results. That avoids restarting
>     upstream
>     >> tasks
>     >>>> in
>     >>>>> case of failover, because the produced intermediate results
>     can be
>     >>>>> re-consumed, at the cost of more disk IO load.
>     >>>>> With both strategies, the trade-off between failover cost
>     and IO load
>     >> is
>     >>>>> for the user to decide. This is also discussed in the
>     MemoryDataManager
>     >>>>> section of the FLIP.
>     >>>>>
>     >>>>> Best,
>     >>>>>
>     >>>>> Xintong
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi
>     <gj...@gmail.com> wrote:
>     >>>>>
>     >>>>>> Thanks Weijie for your answer. So you mean the hybrid
>     shuffle mode
>     >> will
>     >>>>>> limit
>     >>>>>> its usage to the bounded source, Right ?
>     >>>>>> One more question, with the bounded data and partly of the
>     stage is
>     >>>>> running
>     >>>>>> in the Pipelined shuffle mode, what will be the behavior of
>     the task
>     >>>>>> failure, Is the
>     >>>>>> checkpoint enabled for these running stages or will it
>     re-run after
>     >> the
>     >>>>>> failure?
>     >>>>>>
>     >>>>>> Best,
>     >>>>>> Aitozi
>     >>>>>>
>     >>>>>> weijie guo <gu...@gmail.com> 于2022年5月20日周五
>     10:45写道:
>     >>>>>>
>     >>>>>>> Hi, Aitozi:
>     >>>>>>>
>     >>>>>>> Thank you for the feedback!
>     >>>>>>> Here are some of my thoughts on your question
>     >>>>>>>
>     >>>>>>>>>> 1.If there is an unbounded data source, but only have
>     resource to
>     >>>>>>> schedule the first stage, will it bring the big burden to the
>     >>>>>> disk/shuffle
>     >>>>>>> service which will occupy all the resource I think.
>     >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
>     >>>>> scenario,
>     >>>>>> so
>     >>>>>>> there is no problem of unbounded data sources. Secondly,
>     if you
>     >>>>> consider
>     >>>>>>> the stream scenario, I think Pipelined Shuffle should
>     still be the
>     >>>> best
>     >>>>>>> choice at present. For an unbounded data stream, it is not
>     meaningful
>     >>>>> to
>     >>>>>>> only run some stages.
>     >>>>>>>
>     >>>>>>>>>> 2. Which kind of job will benefit from the hybrid
>     shuffle mode.
>     >>>> In
>     >>>>>>> other words, In which case we can use the hybrid shuffle mode:
>     >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch
>     jobs, hybrid
>     >>>>>>> shuffle mode can effectively utilize cluster resources and
>     avoid some
>     >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
>     >>>>> characterized
>     >>>>>>> by a large number of concurrently submitted short batch
>     jobs, hybrid
>     >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
>     >>>> shuffle
>     >>>>>> and
>     >>>>>>> achieve similar performance.
>     >>>>>>>
>     >>>>>>> Best regards,
>     >>>>>>>
>     >>>>>>> Weijie
>     >>>>>>>
>     >>>>>>>
>     >>>>>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
>     >>>>>>>
>     >>>>>>>> Hi Weijie:
>     >>>>>>>>
>     >>>>>>>>        Thanks for the nice FLIP, I have couple questions
>     about this:
>     >>>>>>>>
>     >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is
>     decided by the
>     >>>>>>> resource.
>     >>>>>>>> If there
>     >>>>>>>> is an unbounded data source, but only have resource to
>     schedule the
>     >>>>>> first
>     >>>>>>>> stage, will it
>     >>>>>>>> bring the big burden to the disk/shuffle service which
>     will occupy
>     >>>>> all
>     >>>>>>> the
>     >>>>>>>> resource I think.
>     >>>>>>>>
>     >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle
>     mode. In
>     >>>>>> other
>     >>>>>>>> words, In which
>     >>>>>>>> case we can use the hybrid shuffle mode:
>     >>>>>>>> - For batch job want to use more resource to reduce the
>     e2e time ?
>     >>>>>>>> - Or for streaming job which may lack of resource
>     temporarily ?
>     >>>>>>>> - Or for OLAP job which will try to make best use of
>     available
>     >>>>>> resources
>     >>>>>>> as
>     >>>>>>>> you mentioned to finish the query?
>     >>>>>>>> Just want to know the typical use case for the Hybrid
>     shuffle mode
>     >>>> :)
>     >>>>>>>> Best,
>     >>>>>>>> Aitozi.
>     >>>>>>>>
>     >>>>>>>> weijie guo <gu...@gmail.com> 于2022年5月19日周四
>     18:33写道:
>     >>>>>>>>
>     >>>>>>>>> Yangze, Thank you for the feedback!
>     >>>>>>>>> Here's my thoughts for your questions:
>     >>>>>>>>>
>     >>>>>>>>>>>> How do we decide the size of the buffer pool in
>     >>>>> MemoryDataManager
>     >>>>>>> and
>     >>>>>>>>> the read buffers in FileDataManager?
>     >>>>>>>>> The BufferPool in MemoryDataManager is the
>     LocalBufferPool used
>     >>>> by
>     >>>>>>>>> ResultPartition, and the size is the same as the current
>     >>>>>> implementation
>     >>>>>>>> of
>     >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
>     >>>> BufferPool
>     >>>>>> is
>     >>>>>>> a
>     >>>>>>>>> configurable fixed value, and the maximum value is
>     Math.max(min,
>     >>>> 4
>     >>>>> *
>     >>>>>>>>> numSubpartitions). The default value can be determined
>     by running
>     >>>>> the
>     >>>>>>>>> TPC-DS tests.
>     >>>>>>>>> Read buffers in FileDataManager are requested from the
>     >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
>     >>>>>> controlled
>     >>>>>>> by
>     >>>>>>>>>
>     *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
>     >>>>>> default
>     >>>>>>>>> value is 32M, which is consistent with the current
>     sort-merge
>     >>>>> shuffle
>     >>>>>>>>> logic.
>     >>>>>>>>>
>     >>>>>>>>>>>> Is there an upper limit for the sum of them? If there
>     is, how
>     >>>>>> does
>     >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>     >>>>>>>>> The buffers of the MemoryDataManager are limited by the
>     size of
>     >>>> the
>     >>>>>>>>> LocalBufferPool, and the upper limit is the size of the
>     Network
>     >>>>>> Memory.
>     >>>>>>>> The
>     >>>>>>>>> buffers of the FileDataManager are directly requested from
>     >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size
>     of the
>     >>>>>>> framework
>     >>>>>>>>> off-heap memory. I think there should be no need for
>     additional
>     >>>>>>>>> synchronization mechanisms.
>     >>>>>>>>>
>     >>>>>>>>>>>> How do you disable the slot sharing? If user
>     configures both
>     >>>>> the
>     >>>>>>> slot
>     >>>>>>>>> sharing group and hybrid shuffle, what will happen to
>     that job?
>     >>>>>>>>> I think we can print a warning log when Hybrid Shuffle
>     is enabled
>     >>>>> and
>     >>>>>>> SSG
>     >>>>>>>>> is configured during the JobGraph compilation stage, and
>     fallback
>     >>>>> to
>     >>>>>>> the
>     >>>>>>>>> region slot sharing group by default. Of course, it will be
>     >>>>>> emphasized
>     >>>>>>> in
>     >>>>>>>>> the document that we do not currently support SSG, If
>     configured,
>     >>>>> it
>     >>>>>>> will
>     >>>>>>>>> fall back to the default.
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> Best regards,
>     >>>>>>>>>
>     >>>>>>>>> Weijie
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四
>     16:25写道:
>     >>>>>>>>>
>     >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
>     >>>>>>>>>>
>     >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
>     >>>>> engine.
>     >>>>>> +1
>     >>>>>>>>>> for the overall design.
>     >>>>>>>>>>
>     >>>>>>>>>> Some questions:
>     >>>>>>>>>> 1. How do we decide the size of the buffer pool in
>     >>>>>> MemoryDataManager
>     >>>>>>>>>> and the read buffers in FileDataManager?
>     >>>>>>>>>> 2. Is there an upper limit for the sum of them? If
>     there is,
>     >>>> how
>     >>>>>> does
>     >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory
>     usage?
>     >>>>>>>>>> 3. How do you disable the slot sharing? If user
>     configures both
>     >>>>> the
>     >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen
>     to that
>     >>>>>> job?
>     >>>>>>>>>> Best,
>     >>>>>>>>>> Yangze Guo
>     >>>>>>>>>>
>     >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
>     >>>>>> tonysong820@gmail.com>
>     >>>>>>>>>> wrote:
>     >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
>     >>>>>>>>>>>
>     >>>>>>>>>>> I think this is a good improvement on batch resource
>     >>>>> elasticity.
>     >>>>>>>>> Looking
>     >>>>>>>>>>> forward to the community feedback.
>     >>>>>>>>>>>
>     >>>>>>>>>>> Best,
>     >>>>>>>>>>>
>     >>>>>>>>>>> Xintong
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
>     >>>>>>>> guoweijiereswqa@gmail.com>
>     >>>>>>>>>>> wrote:
>     >>>>>>>>>>>
>     >>>>>>>>>>>> Hi all,
>     >>>>>>>>>>>>
>     >>>>>>>>>>>>
>     >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
>     >>>>>>> introduce a
>     >>>>>>>>>> new shuffle mode
>     >>>>>>>>>>>>    can overcome some of the problems of Pipelined
>     Shuffle and
>     >>>>>>>> Blocking
>     >>>>>>>>>> Shuffle in batch scenarios.
>     >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
>     >>>>> constrained
>     >>>>>>> by
>     >>>>>>>>> the
>     >>>>>>>>>> shuffle implementations.
>     >>>>>>>>>>>> This will bring the following disadvantages:
>     >>>>>>>>>>>>
>     >>>>>>>>>>>>      1. Pipelined Shuffle:
>     >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
>     >>>> tasks
>     >>>>>> are
>     >>>>>>>>>> required to be deployed at the same time, to avoid upstream
>     >>>> tasks
>     >>>>>>> being
>     >>>>>>>>>> blocked forever. This is fine when there are enough
>     resources
>     >>>> for
>     >>>>>>> both
>     >>>>>>>>>> upstream and downstream tasks to run simultaneously,
>     but will
>     >>>>> cause
>     >>>>>>> the
>     >>>>>>>>>> following problems otherwise:
>     >>>>>>>>>>>>      1.
>     >>>>>>>>>>>>  Pipelined shuffle connected tasks (i.e., a pipelined
>     >>>>>>> region)
>     >>>>>>>>>> cannot be executed until obtaining resources for all of
>     them,
>     >>>>>>> resulting
>     >>>>>>>>> in
>     >>>>>>>>>> longer job finishing time and poorer resource
>     efficiency due to
>     >>>>>>> holding
>     >>>>>>>>>> part of the resources idle while waiting for the rest.
>     >>>>>>>>>>>>         2.
>     >>>>>>>>>>>>         More severely, if multiple jobs each hold
>     part of the
>     >>>>>>> cluster
>     >>>>>>>>>> resources and are waiting for more, a deadlock would
>     occur. The
>     >>>>>>> chance
>     >>>>>>>> is
>     >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
>     >>>>> concurrent
>     >>>>>>> job
>     >>>>>>>>>> submissions are frequent.
>     >>>>>>>>>>>>         2. Blocking Shuffle:
>     >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
>     >>>> must
>     >>>>>> wait
>     >>>>>>>> for
>     >>>>>>>>>> all upstream tasks to finish, despite there might be more
>     >>>>> resources
>     >>>>>>>>>> available. The sequential execution of upstream and
>     downstream
>     >>>>>> tasks
>     >>>>>>>>>> significantly increase the job finishing time, and the
>     disk IO
>     >>>>>>> workload
>     >>>>>>>>> for
>     >>>>>>>>>> spilling and loading full intermediate data also
>     affects the
>     >>>>>>>> performance.
>     >>>>>>>>>>>> We believe the root cause of the above problems is that
>     >>>>> shuffle
>     >>>>>>>>>> implementations put unnecessary constraints on task
>     scheduling.
>     >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
>     >>>>> introduce
>     >>>>>>>> hybrid
>     >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
>     >>>>>> Shuffle,
>     >>>>>>>>> Flink
>     >>>>>>>>>> should:
>     >>>>>>>>>>>>      1. Make best use of available resources.
>     >>>>>>>>>>>>  Ideally, we want Flink to always make progress if
>     >>>>> possible.
>     >>>>>>>> That
>     >>>>>>>>>> is to say, it should always execute a pending task if
>     there are
>     >>>>>>>> resources
>     >>>>>>>>>> available for that task.
>     >>>>>>>>>>>>      2. Minimize disk IO load.
>     >>>>>>>>>>>>  In-flight data should be consumed directly from memory
>     >>>> as
>     >>>>>>> much
>     >>>>>>>> as
>     >>>>>>>>>> possible. Only data that is not consumed timely should be
>     >>>> spilled
>     >>>>>> to
>     >>>>>>>>> disk.
>     >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
>     >>>>> your
>     >>>>>>>>>> feedback.
>     >>>>>>>>>>>> [1]
>     >>>>>>>>>>>>
>     >>>>>>>>>>>>
>     >>>>>>>>>>>>
>     >>
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>     >>>>>>>>>>>>
>     >>>>>>>>>>>> Best regards,
>     >>>>>>>>>>>>
>     >>>>>>>>>>>> Weijie
>     >>>>>>>>>>>>
>     >>
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Xintong Song <to...@gmail.com>.
In general, I agree with you about aiming jobs with no/few blocking
exchanges for fine-grained recovery. The only problem is, correct me if I'm
wrong, users currently cannot control the data exchanging mode of a
specific edge. I'm not aware of such APIs.

As a first step, I'd prefer excluding this from the scope of this FLIP.

Best,

Xintong



On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ch...@apache.org> wrote:

> Yes; but that's also a limitation of the current fine-grained recovery.
>
> My suggestion was primarily aimed at jobs that have no/few blocking
> exchanges, where users would currently have to explicitly configure
> additional blocking exchanges to really get something out of
> fine-grained recovery (at the expense of e2e job duration).
>
> On 25/05/2022 14:47, Xintong Song wrote:
> >> Will this also allow spilling everything to disk while also forwarding
> >> data to the next task?
> >>
> > Yes, as long as the downstream task is started, this always forward the
> > data, even while spilling everything.
> >
> > This would allow us to improve fine-grained recovery by no longer being
> >> constrained to pipelined regions.
> >
> > I think it helps preventing restarts of the upstreams for a failed task,
> > but not the downstreams. Because there's no guarantee a restarted task
> will
> > prevent exactly same data (in terms of order) as the previous execution,
> > thus downstreams cannot resume consuming the data.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ch...@apache.org>
> wrote:
> >
> >> Will this also allow spilling everything to disk while also forwarding
> >> data to the next task?
> >>
> >> This would allow us to improve fine-grained recovery by no longer being
> >> constrained to pipelined regions.
> >>
> >> On 25/05/2022 05:55, weijie guo wrote:
> >>> Hi All,
> >>> Thank you for your attention and feedback.
> >>> Do you have any other comments? If there are no other questions, I'll
> >> vote
> >>> on FLIP-235 tomorrow.
> >>>
> >>> Best regards,
> >>>
> >>> Weijie
> >>>
> >>>
> >>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:
> >>>
> >>>> Hi Xintong
> >>>>       Thanks for your detailed explanation, I misunderstand the spill
> >>>> behavior at first glance,
> >>>> I get your point now. I think it will be a good addition to the
> current
> >>>> execution mode.
> >>>> Looking forward to it :)
> >>>>
> >>>> Best,
> >>>> Aitozi
> >>>>
> >>>> Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:
> >>>>
> >>>>> Hi Aitozi,
> >>>>>
> >>>>> In which case we can use the hybrid shuffle mode
> >>>>>
> >>>>> Just to directly answer this question, in addition to
> >>>>> Weijie's explanations. For batch workload, if you want the workload
> to
> >>>> take
> >>>>> advantage of as many resources as available, which ranges from a
> single
> >>>>> slot to as many slots as the total tasks, you may consider hybrid
> >> shuffle
> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may not
> >> want
> >>>>> to execute a job if there's too few resources available, or may not
> >> want
> >>>> a
> >>>>> job taking too many of the cluster resources. That's why we propose
> >>>> hybrid
> >>>>> shuffle as an additional option for batch users, rather than a
> >>>> replacement
> >>>>> for Pipelined or Blocking mode.
> >>>>>
> >>>>> So you mean the hybrid shuffle mode will limit its usage to the
> bounded
> >>>>>> source, Right ?
> >>>>>>
> >>>>> Yes.
> >>>>>
> >>>>> One more question, with the bounded data and partly of the stage is
> >>>> running
> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>>>> failure, Is the checkpoint enabled for these running stages or will
> it
> >>>>>> re-run after the failure?
> >>>>>>
> >>>>> There's no checkpoints. The failover behavior depends on the spilling
> >>>>> strategy.
> >>>>> - In the first version, we only consider a selective spilling
> strategy,
> >>>>> which means spill data as little as possible to the disk, which means
> >> in
> >>>>> case of failover upstream tasks need to be restarted to reproduce the
> >>>>> complete intermediate results.
> >>>>> - An alternative strategy we may introduce in future if needed is to
> >>>> spill
> >>>>> the complete intermediate results. That avoids restarting upstream
> >> tasks
> >>>> in
> >>>>> case of failover, because the produced intermediate results can be
> >>>>> re-consumed, at the cost of more disk IO load.
> >>>>> With both strategies, the trade-off between failover cost and IO load
> >> is
> >>>>> for the user to decide. This is also discussed in the
> MemoryDataManager
> >>>>> section of the FLIP.
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Xintong
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com>
> wrote:
> >>>>>
> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
> >> will
> >>>>>> limit
> >>>>>> its usage to the bounded source, Right ?
> >>>>>> One more question, with the bounded data and partly of the stage is
> >>>>> running
> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>>>> failure, Is the
> >>>>>> checkpoint enabled for these running stages or will it re-run after
> >> the
> >>>>>> failure?
> >>>>>>
> >>>>>> Best,
> >>>>>> Aitozi
> >>>>>>
> >>>>>> weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
> >>>>>>
> >>>>>>> Hi, Aitozi:
> >>>>>>>
> >>>>>>> Thank you for the feedback!
> >>>>>>> Here are some of my thoughts on your question
> >>>>>>>
> >>>>>>>>>> 1.If there is an unbounded data source, but only have resource
> to
> >>>>>>> schedule the first stage, will it bring the big burden to the
> >>>>>> disk/shuffle
> >>>>>>> service which will occupy all the resource I think.
> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
> >>>>> scenario,
> >>>>>> so
> >>>>>>> there is no problem of unbounded data sources. Secondly, if you
> >>>>> consider
> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be the
> >>>> best
> >>>>>>> choice at present. For an unbounded data stream, it is not
> meaningful
> >>>>> to
> >>>>>>> only run some stages.
> >>>>>>>
> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
> >>>> In
> >>>>>>> other words, In which case we can use the hybrid shuffle mode:
> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs,
> hybrid
> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid
> some
> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
> >>>>> characterized
> >>>>>>> by a large number of concurrently submitted short batch jobs,
> hybrid
> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
> >>>> shuffle
> >>>>>> and
> >>>>>>> achieve similar performance.
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>>
> >>>>>>> Weijie
> >>>>>>>
> >>>>>>>
> >>>>>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
> >>>>>>>
> >>>>>>>> Hi Weijie:
> >>>>>>>>
> >>>>>>>>        Thanks for the nice FLIP, I have couple questions about
> this:
> >>>>>>>>
> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> >>>>>>> resource.
> >>>>>>>> If there
> >>>>>>>> is an unbounded data source, but only have resource to schedule
> the
> >>>>>> first
> >>>>>>>> stage, will it
> >>>>>>>> bring the big burden to the disk/shuffle service which will occupy
> >>>>> all
> >>>>>>> the
> >>>>>>>> resource I think.
> >>>>>>>>
> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In
> >>>>>> other
> >>>>>>>> words, In which
> >>>>>>>> case we can use the hybrid shuffle mode:
> >>>>>>>> - For batch job want to use more resource to reduce the e2e time ?
> >>>>>>>> - Or for streaming job which may lack of resource temporarily ?
> >>>>>>>> - Or for OLAP job which will try to make best use of available
> >>>>>> resources
> >>>>>>> as
> >>>>>>>> you mentioned to finish the query?
> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle mode
> >>>> :)
> >>>>>>>> Best,
> >>>>>>>> Aitozi.
> >>>>>>>>
> >>>>>>>> weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
> >>>>>>>>
> >>>>>>>>> Yangze, Thank you for the feedback!
> >>>>>>>>> Here's my thoughts for your questions:
> >>>>>>>>>
> >>>>>>>>>>>> How do we decide the size of the buffer pool in
> >>>>> MemoryDataManager
> >>>>>>> and
> >>>>>>>>> the read buffers in FileDataManager?
> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
> >>>> by
> >>>>>>>>> ResultPartition, and the size is the same as the current
> >>>>>> implementation
> >>>>>>>> of
> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
> >>>> BufferPool
> >>>>>> is
> >>>>>>> a
> >>>>>>>>> configurable fixed value, and the maximum value is Math.max(min,
> >>>> 4
> >>>>> *
> >>>>>>>>> numSubpartitions). The default value can be determined by running
> >>>>> the
> >>>>>>>>> TPC-DS tests.
> >>>>>>>>> Read buffers in FileDataManager are requested from the
> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
> >>>>>> controlled
> >>>>>>> by
> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> >>>>>> default
> >>>>>>>>> value is 32M, which is consistent with the current sort-merge
> >>>>> shuffle
> >>>>>>>>> logic.
> >>>>>>>>>
> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
> >>>>>> does
> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size of
> >>>> the
> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the Network
> >>>>>> Memory.
> >>>>>>>> The
> >>>>>>>>> buffers of the FileDataManager are directly requested from
> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
> >>>>>>> framework
> >>>>>>>>> off-heap memory. I think there should be no need for additional
> >>>>>>>>> synchronization mechanisms.
> >>>>>>>>>
> >>>>>>>>>>>> How do you disable the slot sharing? If user configures both
> >>>>> the
> >>>>>>> slot
> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that job?
> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled
> >>>>> and
> >>>>>>> SSG
> >>>>>>>>> is configured during the JobGraph compilation stage, and fallback
> >>>>> to
> >>>>>>> the
> >>>>>>>>> region slot sharing group by default. Of course, it will be
> >>>>>> emphasized
> >>>>>>> in
> >>>>>>>>> the document that we do not currently support SSG, If configured,
> >>>>> it
> >>>>>>> will
> >>>>>>>>> fall back to the default.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best regards,
> >>>>>>>>>
> >>>>>>>>> Weijie
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> >>>>>>>>>
> >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
> >>>>>>>>>>
> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
> >>>>> engine.
> >>>>>> +1
> >>>>>>>>>> for the overall design.
> >>>>>>>>>>
> >>>>>>>>>> Some questions:
> >>>>>>>>>> 1. How do we decide the size of the buffer pool in
> >>>>>> MemoryDataManager
> >>>>>>>>>> and the read buffers in FileDataManager?
> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
> >>>> how
> >>>>>> does
> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures both
> >>>>> the
> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
> >>>>>> job?
> >>>>>>>>>> Best,
> >>>>>>>>>> Yangze Guo
> >>>>>>>>>>
> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> >>>>>> tonysong820@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
> >>>>>>>>>>>
> >>>>>>>>>>> I think this is a good improvement on batch resource
> >>>>> elasticity.
> >>>>>>>>> Looking
> >>>>>>>>>>> forward to the community feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>>
> >>>>>>>>>>> Xintong
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
> >>>>>>>> guoweijiereswqa@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
> >>>>>>> introduce a
> >>>>>>>>>> new shuffle mode
> >>>>>>>>>>>>    can overcome some of the problems of Pipelined Shuffle and
> >>>>>>>> Blocking
> >>>>>>>>>> Shuffle in batch scenarios.
> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
> >>>>> constrained
> >>>>>>> by
> >>>>>>>>> the
> >>>>>>>>>> shuffle implementations.
> >>>>>>>>>>>> This will bring the following disadvantages:
> >>>>>>>>>>>>
> >>>>>>>>>>>>      1. Pipelined Shuffle:
> >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
> >>>> tasks
> >>>>>> are
> >>>>>>>>>> required to be deployed at the same time, to avoid upstream
> >>>> tasks
> >>>>>>> being
> >>>>>>>>>> blocked forever. This is fine when there are enough resources
> >>>> for
> >>>>>>> both
> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will
> >>>>> cause
> >>>>>>> the
> >>>>>>>>>> following problems otherwise:
> >>>>>>>>>>>>      1.
> >>>>>>>>>>>>         Pipelined shuffle connected tasks (i.e., a pipelined
> >>>>>>> region)
> >>>>>>>>>> cannot be executed until obtaining resources for all of them,
> >>>>>>> resulting
> >>>>>>>>> in
> >>>>>>>>>> longer job finishing time and poorer resource efficiency due to
> >>>>>>> holding
> >>>>>>>>>> part of the resources idle while waiting for the rest.
> >>>>>>>>>>>>         2.
> >>>>>>>>>>>>         More severely, if multiple jobs each hold part of the
> >>>>>>> cluster
> >>>>>>>>>> resources and are waiting for more, a deadlock would occur. The
> >>>>>>> chance
> >>>>>>>> is
> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
> >>>>> concurrent
> >>>>>>> job
> >>>>>>>>>> submissions are frequent.
> >>>>>>>>>>>>         2. Blocking Shuffle:
> >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
> >>>> must
> >>>>>> wait
> >>>>>>>> for
> >>>>>>>>>> all upstream tasks to finish, despite there might be more
> >>>>> resources
> >>>>>>>>>> available. The sequential execution of upstream and downstream
> >>>>>> tasks
> >>>>>>>>>> significantly increase the job finishing time, and the disk IO
> >>>>>>> workload
> >>>>>>>>> for
> >>>>>>>>>> spilling and loading full intermediate data also affects the
> >>>>>>>> performance.
> >>>>>>>>>>>> We believe the root cause of the above problems is that
> >>>>> shuffle
> >>>>>>>>>> implementations put unnecessary constraints on task scheduling.
> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
> >>>>> introduce
> >>>>>>>> hybrid
> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
> >>>>>> Shuffle,
> >>>>>>>>> Flink
> >>>>>>>>>> should:
> >>>>>>>>>>>>      1. Make best use of available resources.
> >>>>>>>>>>>>       Ideally, we want Flink to always make progress if
> >>>>> possible.
> >>>>>>>> That
> >>>>>>>>>> is to say, it should always execute a pending task if there are
> >>>>>>>> resources
> >>>>>>>>>> available for that task.
> >>>>>>>>>>>>      2. Minimize disk IO load.
> >>>>>>>>>>>>       In-flight data should be consumed directly from memory
> >>>> as
> >>>>>>> much
> >>>>>>>> as
> >>>>>>>>>> possible. Only data that is not consumed timely should be
> >>>> spilled
> >>>>>> to
> >>>>>>>>> disk.
> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
> >>>>> your
> >>>>>>>>>> feedback.
> >>>>>>>>>>>> [1]
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best regards,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Weijie
> >>>>>>>>>>>>
> >>
>
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Chesnay Schepler <ch...@apache.org>.
Yes; but that's also a limitation of the current fine-grained recovery.

My suggestion was primarily aimed at jobs that have no/few blocking 
exchanges, where users would currently have to explicitly configure 
additional blocking exchanges to really get something out of 
fine-grained recovery (at the expense of e2e job duration).

On 25/05/2022 14:47, Xintong Song wrote:
>> Will this also allow spilling everything to disk while also forwarding
>> data to the next task?
>>
> Yes, as long as the downstream task is started, this always forward the
> data, even while spilling everything.
>
> This would allow us to improve fine-grained recovery by no longer being
>> constrained to pipelined regions.
>
> I think it helps preventing restarts of the upstreams for a failed task,
> but not the downstreams. Because there's no guarantee a restarted task will
> prevent exactly same data (in terms of order) as the previous execution,
> thus downstreams cannot resume consuming the data.
>
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ch...@apache.org> wrote:
>
>> Will this also allow spilling everything to disk while also forwarding
>> data to the next task?
>>
>> This would allow us to improve fine-grained recovery by no longer being
>> constrained to pipelined regions.
>>
>> On 25/05/2022 05:55, weijie guo wrote:
>>> Hi All,
>>> Thank you for your attention and feedback.
>>> Do you have any other comments? If there are no other questions, I'll
>> vote
>>> on FLIP-235 tomorrow.
>>>
>>> Best regards,
>>>
>>> Weijie
>>>
>>>
>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:
>>>
>>>> Hi Xintong
>>>>       Thanks for your detailed explanation, I misunderstand the spill
>>>> behavior at first glance,
>>>> I get your point now. I think it will be a good addition to the current
>>>> execution mode.
>>>> Looking forward to it :)
>>>>
>>>> Best,
>>>> Aitozi
>>>>
>>>> Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:
>>>>
>>>>> Hi Aitozi,
>>>>>
>>>>> In which case we can use the hybrid shuffle mode
>>>>>
>>>>> Just to directly answer this question, in addition to
>>>>> Weijie's explanations. For batch workload, if you want the workload to
>>>> take
>>>>> advantage of as many resources as available, which ranges from a single
>>>>> slot to as many slots as the total tasks, you may consider hybrid
>> shuffle
>>>>> mode. Admittedly, this may not always be wanted, e.g., users may not
>> want
>>>>> to execute a job if there's too few resources available, or may not
>> want
>>>> a
>>>>> job taking too many of the cluster resources. That's why we propose
>>>> hybrid
>>>>> shuffle as an additional option for batch users, rather than a
>>>> replacement
>>>>> for Pipelined or Blocking mode.
>>>>>
>>>>> So you mean the hybrid shuffle mode will limit its usage to the bounded
>>>>>> source, Right ?
>>>>>>
>>>>> Yes.
>>>>>
>>>>> One more question, with the bounded data and partly of the stage is
>>>> running
>>>>>> in the Pipelined shuffle mode, what will be the behavior of the task
>>>>>> failure, Is the checkpoint enabled for these running stages or will it
>>>>>> re-run after the failure?
>>>>>>
>>>>> There's no checkpoints. The failover behavior depends on the spilling
>>>>> strategy.
>>>>> - In the first version, we only consider a selective spilling strategy,
>>>>> which means spill data as little as possible to the disk, which means
>> in
>>>>> case of failover upstream tasks need to be restarted to reproduce the
>>>>> complete intermediate results.
>>>>> - An alternative strategy we may introduce in future if needed is to
>>>> spill
>>>>> the complete intermediate results. That avoids restarting upstream
>> tasks
>>>> in
>>>>> case of failover, because the produced intermediate results can be
>>>>> re-consumed, at the cost of more disk IO load.
>>>>> With both strategies, the trade-off between failover cost and IO load
>> is
>>>>> for the user to decide. This is also discussed in the MemoryDataManager
>>>>> section of the FLIP.
>>>>>
>>>>> Best,
>>>>>
>>>>> Xintong
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
>> will
>>>>>> limit
>>>>>> its usage to the bounded source, Right ?
>>>>>> One more question, with the bounded data and partly of the stage is
>>>>> running
>>>>>> in the Pipelined shuffle mode, what will be the behavior of the task
>>>>>> failure, Is the
>>>>>> checkpoint enabled for these running stages or will it re-run after
>> the
>>>>>> failure?
>>>>>>
>>>>>> Best,
>>>>>> Aitozi
>>>>>>
>>>>>> weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
>>>>>>
>>>>>>> Hi, Aitozi:
>>>>>>>
>>>>>>> Thank you for the feedback!
>>>>>>> Here are some of my thoughts on your question
>>>>>>>
>>>>>>>>>> 1.If there is an unbounded data source, but only have resource to
>>>>>>> schedule the first stage, will it bring the big burden to the
>>>>>> disk/shuffle
>>>>>>> service which will occupy all the resource I think.
>>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
>>>>> scenario,
>>>>>> so
>>>>>>> there is no problem of unbounded data sources. Secondly, if you
>>>>> consider
>>>>>>> the stream scenario, I think Pipelined Shuffle should still be the
>>>> best
>>>>>>> choice at present. For an unbounded data stream, it is not meaningful
>>>>> to
>>>>>>> only run some stages.
>>>>>>>
>>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
>>>> In
>>>>>>> other words, In which case we can use the hybrid shuffle mode:
>>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
>>>>>>> shuffle mode can effectively utilize cluster resources and avoid some
>>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
>>>>> characterized
>>>>>>> by a large number of concurrently submitted short batch jobs, hybrid
>>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
>>>> shuffle
>>>>>> and
>>>>>>> achieve similar performance.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Weijie
>>>>>>>
>>>>>>>
>>>>>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
>>>>>>>
>>>>>>>> Hi Weijie:
>>>>>>>>
>>>>>>>>        Thanks for the nice FLIP, I have couple questions about this:
>>>>>>>>
>>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
>>>>>>> resource.
>>>>>>>> If there
>>>>>>>> is an unbounded data source, but only have resource to schedule the
>>>>>> first
>>>>>>>> stage, will it
>>>>>>>> bring the big burden to the disk/shuffle service which will occupy
>>>>> all
>>>>>>> the
>>>>>>>> resource I think.
>>>>>>>>
>>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In
>>>>>> other
>>>>>>>> words, In which
>>>>>>>> case we can use the hybrid shuffle mode:
>>>>>>>> - For batch job want to use more resource to reduce the e2e time ?
>>>>>>>> - Or for streaming job which may lack of resource temporarily ?
>>>>>>>> - Or for OLAP job which will try to make best use of available
>>>>>> resources
>>>>>>> as
>>>>>>>> you mentioned to finish the query?
>>>>>>>> Just want to know the typical use case for the Hybrid shuffle mode
>>>> :)
>>>>>>>> Best,
>>>>>>>> Aitozi.
>>>>>>>>
>>>>>>>> weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
>>>>>>>>
>>>>>>>>> Yangze, Thank you for the feedback!
>>>>>>>>> Here's my thoughts for your questions:
>>>>>>>>>
>>>>>>>>>>>> How do we decide the size of the buffer pool in
>>>>> MemoryDataManager
>>>>>>> and
>>>>>>>>> the read buffers in FileDataManager?
>>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
>>>> by
>>>>>>>>> ResultPartition, and the size is the same as the current
>>>>>> implementation
>>>>>>>> of
>>>>>>>>> sort-merge shuffle. In other words, the minimum value of
>>>> BufferPool
>>>>>> is
>>>>>>> a
>>>>>>>>> configurable fixed value, and the maximum value is Math.max(min,
>>>> 4
>>>>> *
>>>>>>>>> numSubpartitions). The default value can be determined by running
>>>>> the
>>>>>>>>> TPC-DS tests.
>>>>>>>>> Read buffers in FileDataManager are requested from the
>>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
>>>>>> controlled
>>>>>>> by
>>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
>>>>>> default
>>>>>>>>> value is 32M, which is consistent with the current sort-merge
>>>>> shuffle
>>>>>>>>> logic.
>>>>>>>>>
>>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
>>>>>> does
>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>>>>>>>>> The buffers of the MemoryDataManager are limited by the size of
>>>> the
>>>>>>>>> LocalBufferPool, and the upper limit is the size of the Network
>>>>>> Memory.
>>>>>>>> The
>>>>>>>>> buffers of the FileDataManager are directly requested from
>>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
>>>>>>> framework
>>>>>>>>> off-heap memory. I think there should be no need for additional
>>>>>>>>> synchronization mechanisms.
>>>>>>>>>
>>>>>>>>>>>> How do you disable the slot sharing? If user configures both
>>>>> the
>>>>>>> slot
>>>>>>>>> sharing group and hybrid shuffle, what will happen to that job?
>>>>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled
>>>>> and
>>>>>>> SSG
>>>>>>>>> is configured during the JobGraph compilation stage, and fallback
>>>>> to
>>>>>>> the
>>>>>>>>> region slot sharing group by default. Of course, it will be
>>>>>> emphasized
>>>>>>> in
>>>>>>>>> the document that we do not currently support SSG, If configured,
>>>>> it
>>>>>>> will
>>>>>>>>> fall back to the default.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>>
>>>>>>>>> Weijie
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
>>>>>>>>>
>>>>>>>>>> Thanks for driving this. Xintong and Weijie.
>>>>>>>>>>
>>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
>>>>> engine.
>>>>>> +1
>>>>>>>>>> for the overall design.
>>>>>>>>>>
>>>>>>>>>> Some questions:
>>>>>>>>>> 1. How do we decide the size of the buffer pool in
>>>>>> MemoryDataManager
>>>>>>>>>> and the read buffers in FileDataManager?
>>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
>>>> how
>>>>>> does
>>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>>>>>>>>>> 3. How do you disable the slot sharing? If user configures both
>>>>> the
>>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
>>>>>> job?
>>>>>>>>>> Best,
>>>>>>>>>> Yangze Guo
>>>>>>>>>>
>>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
>>>>>> tonysong820@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
>>>>>>>>>>>
>>>>>>>>>>> I think this is a good improvement on batch resource
>>>>> elasticity.
>>>>>>>>> Looking
>>>>>>>>>>> forward to the community feedback.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Xintong
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
>>>>>>>> guoweijiereswqa@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
>>>>>>> introduce a
>>>>>>>>>> new shuffle mode
>>>>>>>>>>>>    can overcome some of the problems of Pipelined Shuffle and
>>>>>>>> Blocking
>>>>>>>>>> Shuffle in batch scenarios.
>>>>>>>>>>>> Currently in Flink, task scheduling is more or less
>>>>> constrained
>>>>>>> by
>>>>>>>>> the
>>>>>>>>>> shuffle implementations.
>>>>>>>>>>>> This will bring the following disadvantages:
>>>>>>>>>>>>
>>>>>>>>>>>>      1. Pipelined Shuffle:
>>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
>>>> tasks
>>>>>> are
>>>>>>>>>> required to be deployed at the same time, to avoid upstream
>>>> tasks
>>>>>>> being
>>>>>>>>>> blocked forever. This is fine when there are enough resources
>>>> for
>>>>>>> both
>>>>>>>>>> upstream and downstream tasks to run simultaneously, but will
>>>>> cause
>>>>>>> the
>>>>>>>>>> following problems otherwise:
>>>>>>>>>>>>      1.
>>>>>>>>>>>>         Pipelined shuffle connected tasks (i.e., a pipelined
>>>>>>> region)
>>>>>>>>>> cannot be executed until obtaining resources for all of them,
>>>>>>> resulting
>>>>>>>>> in
>>>>>>>>>> longer job finishing time and poorer resource efficiency due to
>>>>>>> holding
>>>>>>>>>> part of the resources idle while waiting for the rest.
>>>>>>>>>>>>         2.
>>>>>>>>>>>>         More severely, if multiple jobs each hold part of the
>>>>>>> cluster
>>>>>>>>>> resources and are waiting for more, a deadlock would occur. The
>>>>>>> chance
>>>>>>>> is
>>>>>>>>>> not trivial, especially for scenarios such as OLAP where
>>>>> concurrent
>>>>>>> job
>>>>>>>>>> submissions are frequent.
>>>>>>>>>>>>         2. Blocking Shuffle:
>>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
>>>> must
>>>>>> wait
>>>>>>>> for
>>>>>>>>>> all upstream tasks to finish, despite there might be more
>>>>> resources
>>>>>>>>>> available. The sequential execution of upstream and downstream
>>>>>> tasks
>>>>>>>>>> significantly increase the job finishing time, and the disk IO
>>>>>>> workload
>>>>>>>>> for
>>>>>>>>>> spilling and loading full intermediate data also affects the
>>>>>>>> performance.
>>>>>>>>>>>> We believe the root cause of the above problems is that
>>>>> shuffle
>>>>>>>>>> implementations put unnecessary constraints on task scheduling.
>>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
>>>>> introduce
>>>>>>>> hybrid
>>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
>>>>>> Shuffle,
>>>>>>>>> Flink
>>>>>>>>>> should:
>>>>>>>>>>>>      1. Make best use of available resources.
>>>>>>>>>>>>       Ideally, we want Flink to always make progress if
>>>>> possible.
>>>>>>>> That
>>>>>>>>>> is to say, it should always execute a pending task if there are
>>>>>>>> resources
>>>>>>>>>> available for that task.
>>>>>>>>>>>>      2. Minimize disk IO load.
>>>>>>>>>>>>       In-flight data should be consumed directly from memory
>>>> as
>>>>>>> much
>>>>>>>> as
>>>>>>>>>> possible. Only data that is not consumed timely should be
>>>> spilled
>>>>>> to
>>>>>>>>> disk.
>>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
>>>>> your
>>>>>>>>>> feedback.
>>>>>>>>>>>> [1]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>>>>>>>>>>>>
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Weijie
>>>>>>>>>>>>
>>


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Xintong Song <to...@gmail.com>.
>
> Will this also allow spilling everything to disk while also forwarding
> data to the next task?
>

Yes, as long as the downstream task is started, this always forward the
data, even while spilling everything.

This would allow us to improve fine-grained recovery by no longer being
> constrained to pipelined regions.


I think it helps preventing restarts of the upstreams for a failed task,
but not the downstreams. Because there's no guarantee a restarted task will
prevent exactly same data (in terms of order) as the previous execution,
thus downstreams cannot resume consuming the data.


Best,

Xintong



On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ch...@apache.org> wrote:

> Will this also allow spilling everything to disk while also forwarding
> data to the next task?
>
> This would allow us to improve fine-grained recovery by no longer being
> constrained to pipelined regions.
>
> On 25/05/2022 05:55, weijie guo wrote:
> > Hi All,
> > Thank you for your attention and feedback.
> > Do you have any other comments? If there are no other questions, I'll
> vote
> > on FLIP-235 tomorrow.
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:
> >
> >> Hi Xintong
> >>      Thanks for your detailed explanation, I misunderstand the spill
> >> behavior at first glance,
> >> I get your point now. I think it will be a good addition to the current
> >> execution mode.
> >> Looking forward to it :)
> >>
> >> Best,
> >> Aitozi
> >>
> >> Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:
> >>
> >>> Hi Aitozi,
> >>>
> >>> In which case we can use the hybrid shuffle mode
> >>>
> >>> Just to directly answer this question, in addition to
> >>> Weijie's explanations. For batch workload, if you want the workload to
> >> take
> >>> advantage of as many resources as available, which ranges from a single
> >>> slot to as many slots as the total tasks, you may consider hybrid
> shuffle
> >>> mode. Admittedly, this may not always be wanted, e.g., users may not
> want
> >>> to execute a job if there's too few resources available, or may not
> want
> >> a
> >>> job taking too many of the cluster resources. That's why we propose
> >> hybrid
> >>> shuffle as an additional option for batch users, rather than a
> >> replacement
> >>> for Pipelined or Blocking mode.
> >>>
> >>> So you mean the hybrid shuffle mode will limit its usage to the bounded
> >>>> source, Right ?
> >>>>
> >>> Yes.
> >>>
> >>> One more question, with the bounded data and partly of the stage is
> >> running
> >>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>> failure, Is the checkpoint enabled for these running stages or will it
> >>>> re-run after the failure?
> >>>>
> >>> There's no checkpoints. The failover behavior depends on the spilling
> >>> strategy.
> >>> - In the first version, we only consider a selective spilling strategy,
> >>> which means spill data as little as possible to the disk, which means
> in
> >>> case of failover upstream tasks need to be restarted to reproduce the
> >>> complete intermediate results.
> >>> - An alternative strategy we may introduce in future if needed is to
> >> spill
> >>> the complete intermediate results. That avoids restarting upstream
> tasks
> >> in
> >>> case of failover, because the produced intermediate results can be
> >>> re-consumed, at the cost of more disk IO load.
> >>> With both strategies, the trade-off between failover cost and IO load
> is
> >>> for the user to decide. This is also discussed in the MemoryDataManager
> >>> section of the FLIP.
> >>>
> >>> Best,
> >>>
> >>> Xintong
> >>>
> >>>
> >>>
> >>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com> wrote:
> >>>
> >>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
> will
> >>>> limit
> >>>> its usage to the bounded source, Right ?
> >>>> One more question, with the bounded data and partly of the stage is
> >>> running
> >>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>> failure, Is the
> >>>> checkpoint enabled for these running stages or will it re-run after
> the
> >>>> failure?
> >>>>
> >>>> Best,
> >>>> Aitozi
> >>>>
> >>>> weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
> >>>>
> >>>>> Hi, Aitozi:
> >>>>>
> >>>>> Thank you for the feedback!
> >>>>> Here are some of my thoughts on your question
> >>>>>
> >>>>>>>> 1.If there is an unbounded data source, but only have resource to
> >>>>> schedule the first stage, will it bring the big burden to the
> >>>> disk/shuffle
> >>>>> service which will occupy all the resource I think.
> >>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
> >>> scenario,
> >>>> so
> >>>>> there is no problem of unbounded data sources. Secondly, if you
> >>> consider
> >>>>> the stream scenario, I think Pipelined Shuffle should still be the
> >> best
> >>>>> choice at present. For an unbounded data stream, it is not meaningful
> >>> to
> >>>>> only run some stages.
> >>>>>
> >>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
> >> In
> >>>>> other words, In which case we can use the hybrid shuffle mode:
> >>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> >>>>> shuffle mode can effectively utilize cluster resources and avoid some
> >>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
> >>> characterized
> >>>>> by a large number of concurrently submitted short batch jobs, hybrid
> >>>>> shuffle can solve the scheduling deadlock problem of pipelined
> >> shuffle
> >>>> and
> >>>>> achieve similar performance.
> >>>>>
> >>>>> Best regards,
> >>>>>
> >>>>> Weijie
> >>>>>
> >>>>>
> >>>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
> >>>>>
> >>>>>> Hi Weijie:
> >>>>>>
> >>>>>>       Thanks for the nice FLIP, I have couple questions about this:
> >>>>>>
> >>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> >>>>> resource.
> >>>>>> If there
> >>>>>> is an unbounded data source, but only have resource to schedule the
> >>>> first
> >>>>>> stage, will it
> >>>>>> bring the big burden to the disk/shuffle service which will occupy
> >>> all
> >>>>> the
> >>>>>> resource I think.
> >>>>>>
> >>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In
> >>>> other
> >>>>>> words, In which
> >>>>>> case we can use the hybrid shuffle mode:
> >>>>>> - For batch job want to use more resource to reduce the e2e time ?
> >>>>>> - Or for streaming job which may lack of resource temporarily ?
> >>>>>> - Or for OLAP job which will try to make best use of available
> >>>> resources
> >>>>> as
> >>>>>> you mentioned to finish the query?
> >>>>>> Just want to know the typical use case for the Hybrid shuffle mode
> >> :)
> >>>>>>
> >>>>>> Best,
> >>>>>> Aitozi.
> >>>>>>
> >>>>>> weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
> >>>>>>
> >>>>>>> Yangze, Thank you for the feedback!
> >>>>>>> Here's my thoughts for your questions:
> >>>>>>>
> >>>>>>>>>> How do we decide the size of the buffer pool in
> >>> MemoryDataManager
> >>>>> and
> >>>>>>> the read buffers in FileDataManager?
> >>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
> >> by
> >>>>>>> ResultPartition, and the size is the same as the current
> >>>> implementation
> >>>>>> of
> >>>>>>> sort-merge shuffle. In other words, the minimum value of
> >> BufferPool
> >>>> is
> >>>>> a
> >>>>>>> configurable fixed value, and the maximum value is Math.max(min,
> >> 4
> >>> *
> >>>>>>> numSubpartitions). The default value can be determined by running
> >>> the
> >>>>>>> TPC-DS tests.
> >>>>>>> Read buffers in FileDataManager are requested from the
> >>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
> >>>> controlled
> >>>>> by
> >>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> >>>> default
> >>>>>>> value is 32M, which is consistent with the current sort-merge
> >>> shuffle
> >>>>>>> logic.
> >>>>>>>
> >>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
> >>>> does
> >>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>> The buffers of the MemoryDataManager are limited by the size of
> >> the
> >>>>>>> LocalBufferPool, and the upper limit is the size of the Network
> >>>> Memory.
> >>>>>> The
> >>>>>>> buffers of the FileDataManager are directly requested from
> >>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
> >>>>> framework
> >>>>>>> off-heap memory. I think there should be no need for additional
> >>>>>>> synchronization mechanisms.
> >>>>>>>
> >>>>>>>>>> How do you disable the slot sharing? If user configures both
> >>> the
> >>>>> slot
> >>>>>>> sharing group and hybrid shuffle, what will happen to that job?
> >>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled
> >>> and
> >>>>> SSG
> >>>>>>> is configured during the JobGraph compilation stage, and fallback
> >>> to
> >>>>> the
> >>>>>>> region slot sharing group by default. Of course, it will be
> >>>> emphasized
> >>>>> in
> >>>>>>> the document that we do not currently support SSG, If configured,
> >>> it
> >>>>> will
> >>>>>>> fall back to the default.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>>
> >>>>>>> Weijie
> >>>>>>>
> >>>>>>>
> >>>>>>> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> >>>>>>>
> >>>>>>>> Thanks for driving this. Xintong and Weijie.
> >>>>>>>>
> >>>>>>>> I believe this feature will make Flink a better batch/OLAP
> >>> engine.
> >>>> +1
> >>>>>>>> for the overall design.
> >>>>>>>>
> >>>>>>>> Some questions:
> >>>>>>>> 1. How do we decide the size of the buffer pool in
> >>>> MemoryDataManager
> >>>>>>>> and the read buffers in FileDataManager?
> >>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
> >> how
> >>>> does
> >>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>>> 3. How do you disable the slot sharing? If user configures both
> >>> the
> >>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
> >>>> job?
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Yangze Guo
> >>>>>>>>
> >>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> >>>> tonysong820@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>> Thanks for preparing this FLIP, Weijie.
> >>>>>>>>>
> >>>>>>>>> I think this is a good improvement on batch resource
> >>> elasticity.
> >>>>>>> Looking
> >>>>>>>>> forward to the community feedback.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>>
> >>>>>>>>> Xintong
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
> >>>>>> guoweijiereswqa@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
> >>>>> introduce a
> >>>>>>>> new shuffle mode
> >>>>>>>>>>   can overcome some of the problems of Pipelined Shuffle and
> >>>>>> Blocking
> >>>>>>>> Shuffle in batch scenarios.
> >>>>>>>>>>
> >>>>>>>>>> Currently in Flink, task scheduling is more or less
> >>> constrained
> >>>>> by
> >>>>>>> the
> >>>>>>>> shuffle implementations.
> >>>>>>>>>> This will bring the following disadvantages:
> >>>>>>>>>>
> >>>>>>>>>>     1. Pipelined Shuffle:
> >>>>>>>>>>      For pipelined shuffle, the upstream and downstream
> >> tasks
> >>>> are
> >>>>>>>> required to be deployed at the same time, to avoid upstream
> >> tasks
> >>>>> being
> >>>>>>>> blocked forever. This is fine when there are enough resources
> >> for
> >>>>> both
> >>>>>>>> upstream and downstream tasks to run simultaneously, but will
> >>> cause
> >>>>> the
> >>>>>>>> following problems otherwise:
> >>>>>>>>>>     1.
> >>>>>>>>>>        Pipelined shuffle connected tasks (i.e., a pipelined
> >>>>> region)
> >>>>>>>> cannot be executed until obtaining resources for all of them,
> >>>>> resulting
> >>>>>>> in
> >>>>>>>> longer job finishing time and poorer resource efficiency due to
> >>>>> holding
> >>>>>>>> part of the resources idle while waiting for the rest.
> >>>>>>>>>>        2.
> >>>>>>>>>>        More severely, if multiple jobs each hold part of the
> >>>>> cluster
> >>>>>>>> resources and are waiting for more, a deadlock would occur. The
> >>>>> chance
> >>>>>> is
> >>>>>>>> not trivial, especially for scenarios such as OLAP where
> >>> concurrent
> >>>>> job
> >>>>>>>> submissions are frequent.
> >>>>>>>>>>        2. Blocking Shuffle:
> >>>>>>>>>>      For blocking shuffle, execution of downstream tasks
> >> must
> >>>> wait
> >>>>>> for
> >>>>>>>> all upstream tasks to finish, despite there might be more
> >>> resources
> >>>>>>>> available. The sequential execution of upstream and downstream
> >>>> tasks
> >>>>>>>> significantly increase the job finishing time, and the disk IO
> >>>>> workload
> >>>>>>> for
> >>>>>>>> spilling and loading full intermediate data also affects the
> >>>>>> performance.
> >>>>>>>>>>
> >>>>>>>>>> We believe the root cause of the above problems is that
> >>> shuffle
> >>>>>>>> implementations put unnecessary constraints on task scheduling.
> >>>>>>>>>>
> >>>>>>>>>> To solve this problem, Xintong Song and I propose to
> >>> introduce
> >>>>>> hybrid
> >>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
> >>>> Shuffle,
> >>>>>>> Flink
> >>>>>>>> should:
> >>>>>>>>>>     1. Make best use of available resources.
> >>>>>>>>>>      Ideally, we want Flink to always make progress if
> >>> possible.
> >>>>>> That
> >>>>>>>> is to say, it should always execute a pending task if there are
> >>>>>> resources
> >>>>>>>> available for that task.
> >>>>>>>>>>     2. Minimize disk IO load.
> >>>>>>>>>>      In-flight data should be consumed directly from memory
> >> as
> >>>>> much
> >>>>>> as
> >>>>>>>> possible. Only data that is not consumed timely should be
> >> spilled
> >>>> to
> >>>>>>> disk.
> >>>>>>>>>> You can find more details in FLIP-235. Looking forward to
> >>> your
> >>>>>>>> feedback.
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best regards,
> >>>>>>>>>>
> >>>>>>>>>> Weijie
> >>>>>>>>>>
>
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Chesnay Schepler <ch...@apache.org>.
Will this also allow spilling everything to disk while also forwarding 
data to the next task?

This would allow us to improve fine-grained recovery by no longer being 
constrained to pipelined regions.

On 25/05/2022 05:55, weijie guo wrote:
> Hi All,
> Thank you for your attention and feedback.
> Do you have any other comments? If there are no other questions, I'll vote
> on FLIP-235 tomorrow.
>
> Best regards,
>
> Weijie
>
>
> Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:
>
>> Hi Xintong
>>      Thanks for your detailed explanation, I misunderstand the spill
>> behavior at first glance,
>> I get your point now. I think it will be a good addition to the current
>> execution mode.
>> Looking forward to it :)
>>
>> Best,
>> Aitozi
>>
>> Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:
>>
>>> Hi Aitozi,
>>>
>>> In which case we can use the hybrid shuffle mode
>>>
>>> Just to directly answer this question, in addition to
>>> Weijie's explanations. For batch workload, if you want the workload to
>> take
>>> advantage of as many resources as available, which ranges from a single
>>> slot to as many slots as the total tasks, you may consider hybrid shuffle
>>> mode. Admittedly, this may not always be wanted, e.g., users may not want
>>> to execute a job if there's too few resources available, or may not want
>> a
>>> job taking too many of the cluster resources. That's why we propose
>> hybrid
>>> shuffle as an additional option for batch users, rather than a
>> replacement
>>> for Pipelined or Blocking mode.
>>>
>>> So you mean the hybrid shuffle mode will limit its usage to the bounded
>>>> source, Right ?
>>>>
>>> Yes.
>>>
>>> One more question, with the bounded data and partly of the stage is
>> running
>>>> in the Pipelined shuffle mode, what will be the behavior of the task
>>>> failure, Is the checkpoint enabled for these running stages or will it
>>>> re-run after the failure?
>>>>
>>> There's no checkpoints. The failover behavior depends on the spilling
>>> strategy.
>>> - In the first version, we only consider a selective spilling strategy,
>>> which means spill data as little as possible to the disk, which means in
>>> case of failover upstream tasks need to be restarted to reproduce the
>>> complete intermediate results.
>>> - An alternative strategy we may introduce in future if needed is to
>> spill
>>> the complete intermediate results. That avoids restarting upstream tasks
>> in
>>> case of failover, because the produced intermediate results can be
>>> re-consumed, at the cost of more disk IO load.
>>> With both strategies, the trade-off between failover cost and IO load is
>>> for the user to decide. This is also discussed in the MemoryDataManager
>>> section of the FLIP.
>>>
>>> Best,
>>>
>>> Xintong
>>>
>>>
>>>
>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com> wrote:
>>>
>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
>>>> limit
>>>> its usage to the bounded source, Right ?
>>>> One more question, with the bounded data and partly of the stage is
>>> running
>>>> in the Pipelined shuffle mode, what will be the behavior of the task
>>>> failure, Is the
>>>> checkpoint enabled for these running stages or will it re-run after the
>>>> failure?
>>>>
>>>> Best,
>>>> Aitozi
>>>>
>>>> weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
>>>>
>>>>> Hi, Aitozi:
>>>>>
>>>>> Thank you for the feedback!
>>>>> Here are some of my thoughts on your question
>>>>>
>>>>>>>> 1.If there is an unbounded data source, but only have resource to
>>>>> schedule the first stage, will it bring the big burden to the
>>>> disk/shuffle
>>>>> service which will occupy all the resource I think.
>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
>>> scenario,
>>>> so
>>>>> there is no problem of unbounded data sources. Secondly, if you
>>> consider
>>>>> the stream scenario, I think Pipelined Shuffle should still be the
>> best
>>>>> choice at present. For an unbounded data stream, it is not meaningful
>>> to
>>>>> only run some stages.
>>>>>
>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
>> In
>>>>> other words, In which case we can use the hybrid shuffle mode:
>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
>>>>> shuffle mode can effectively utilize cluster resources and avoid some
>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
>>> characterized
>>>>> by a large number of concurrently submitted short batch jobs, hybrid
>>>>> shuffle can solve the scheduling deadlock problem of pipelined
>> shuffle
>>>> and
>>>>> achieve similar performance.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Weijie
>>>>>
>>>>>
>>>>> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
>>>>>
>>>>>> Hi Weijie:
>>>>>>
>>>>>>       Thanks for the nice FLIP, I have couple questions about this:
>>>>>>
>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
>>>>> resource.
>>>>>> If there
>>>>>> is an unbounded data source, but only have resource to schedule the
>>>> first
>>>>>> stage, will it
>>>>>> bring the big burden to the disk/shuffle service which will occupy
>>> all
>>>>> the
>>>>>> resource I think.
>>>>>>
>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In
>>>> other
>>>>>> words, In which
>>>>>> case we can use the hybrid shuffle mode:
>>>>>> - For batch job want to use more resource to reduce the e2e time ?
>>>>>> - Or for streaming job which may lack of resource temporarily ?
>>>>>> - Or for OLAP job which will try to make best use of available
>>>> resources
>>>>> as
>>>>>> you mentioned to finish the query?
>>>>>> Just want to know the typical use case for the Hybrid shuffle mode
>> :)
>>>>>>
>>>>>> Best,
>>>>>> Aitozi.
>>>>>>
>>>>>> weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
>>>>>>
>>>>>>> Yangze, Thank you for the feedback!
>>>>>>> Here's my thoughts for your questions:
>>>>>>>
>>>>>>>>>> How do we decide the size of the buffer pool in
>>> MemoryDataManager
>>>>> and
>>>>>>> the read buffers in FileDataManager?
>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
>> by
>>>>>>> ResultPartition, and the size is the same as the current
>>>> implementation
>>>>>> of
>>>>>>> sort-merge shuffle. In other words, the minimum value of
>> BufferPool
>>>> is
>>>>> a
>>>>>>> configurable fixed value, and the maximum value is Math.max(min,
>> 4
>>> *
>>>>>>> numSubpartitions). The default value can be determined by running
>>> the
>>>>>>> TPC-DS tests.
>>>>>>> Read buffers in FileDataManager are requested from the
>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
>>>> controlled
>>>>> by
>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
>>>> default
>>>>>>> value is 32M, which is consistent with the current sort-merge
>>> shuffle
>>>>>>> logic.
>>>>>>>
>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
>>>> does
>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>>>>>>> The buffers of the MemoryDataManager are limited by the size of
>> the
>>>>>>> LocalBufferPool, and the upper limit is the size of the Network
>>>> Memory.
>>>>>> The
>>>>>>> buffers of the FileDataManager are directly requested from
>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
>>>>> framework
>>>>>>> off-heap memory. I think there should be no need for additional
>>>>>>> synchronization mechanisms.
>>>>>>>
>>>>>>>>>> How do you disable the slot sharing? If user configures both
>>> the
>>>>> slot
>>>>>>> sharing group and hybrid shuffle, what will happen to that job?
>>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled
>>> and
>>>>> SSG
>>>>>>> is configured during the JobGraph compilation stage, and fallback
>>> to
>>>>> the
>>>>>>> region slot sharing group by default. Of course, it will be
>>>> emphasized
>>>>> in
>>>>>>> the document that we do not currently support SSG, If configured,
>>> it
>>>>> will
>>>>>>> fall back to the default.
>>>>>>>
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Weijie
>>>>>>>
>>>>>>>
>>>>>>> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
>>>>>>>
>>>>>>>> Thanks for driving this. Xintong and Weijie.
>>>>>>>>
>>>>>>>> I believe this feature will make Flink a better batch/OLAP
>>> engine.
>>>> +1
>>>>>>>> for the overall design.
>>>>>>>>
>>>>>>>> Some questions:
>>>>>>>> 1. How do we decide the size of the buffer pool in
>>>> MemoryDataManager
>>>>>>>> and the read buffers in FileDataManager?
>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
>> how
>>>> does
>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>>>>>>>> 3. How do you disable the slot sharing? If user configures both
>>> the
>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
>>>> job?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Yangze Guo
>>>>>>>>
>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
>>>> tonysong820@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> Thanks for preparing this FLIP, Weijie.
>>>>>>>>>
>>>>>>>>> I think this is a good improvement on batch resource
>>> elasticity.
>>>>>>> Looking
>>>>>>>>> forward to the community feedback.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Xintong
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
>>>>>> guoweijiereswqa@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
>>>>> introduce a
>>>>>>>> new shuffle mode
>>>>>>>>>>   can overcome some of the problems of Pipelined Shuffle and
>>>>>> Blocking
>>>>>>>> Shuffle in batch scenarios.
>>>>>>>>>>
>>>>>>>>>> Currently in Flink, task scheduling is more or less
>>> constrained
>>>>> by
>>>>>>> the
>>>>>>>> shuffle implementations.
>>>>>>>>>> This will bring the following disadvantages:
>>>>>>>>>>
>>>>>>>>>>     1. Pipelined Shuffle:
>>>>>>>>>>      For pipelined shuffle, the upstream and downstream
>> tasks
>>>> are
>>>>>>>> required to be deployed at the same time, to avoid upstream
>> tasks
>>>>> being
>>>>>>>> blocked forever. This is fine when there are enough resources
>> for
>>>>> both
>>>>>>>> upstream and downstream tasks to run simultaneously, but will
>>> cause
>>>>> the
>>>>>>>> following problems otherwise:
>>>>>>>>>>     1.
>>>>>>>>>>        Pipelined shuffle connected tasks (i.e., a pipelined
>>>>> region)
>>>>>>>> cannot be executed until obtaining resources for all of them,
>>>>> resulting
>>>>>>> in
>>>>>>>> longer job finishing time and poorer resource efficiency due to
>>>>> holding
>>>>>>>> part of the resources idle while waiting for the rest.
>>>>>>>>>>        2.
>>>>>>>>>>        More severely, if multiple jobs each hold part of the
>>>>> cluster
>>>>>>>> resources and are waiting for more, a deadlock would occur. The
>>>>> chance
>>>>>> is
>>>>>>>> not trivial, especially for scenarios such as OLAP where
>>> concurrent
>>>>> job
>>>>>>>> submissions are frequent.
>>>>>>>>>>        2. Blocking Shuffle:
>>>>>>>>>>      For blocking shuffle, execution of downstream tasks
>> must
>>>> wait
>>>>>> for
>>>>>>>> all upstream tasks to finish, despite there might be more
>>> resources
>>>>>>>> available. The sequential execution of upstream and downstream
>>>> tasks
>>>>>>>> significantly increase the job finishing time, and the disk IO
>>>>> workload
>>>>>>> for
>>>>>>>> spilling and loading full intermediate data also affects the
>>>>>> performance.
>>>>>>>>>>
>>>>>>>>>> We believe the root cause of the above problems is that
>>> shuffle
>>>>>>>> implementations put unnecessary constraints on task scheduling.
>>>>>>>>>>
>>>>>>>>>> To solve this problem, Xintong Song and I propose to
>>> introduce
>>>>>> hybrid
>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
>>>> Shuffle,
>>>>>>> Flink
>>>>>>>> should:
>>>>>>>>>>     1. Make best use of available resources.
>>>>>>>>>>      Ideally, we want Flink to always make progress if
>>> possible.
>>>>>> That
>>>>>>>> is to say, it should always execute a pending task if there are
>>>>>> resources
>>>>>>>> available for that task.
>>>>>>>>>>     2. Minimize disk IO load.
>>>>>>>>>>      In-flight data should be consumed directly from memory
>> as
>>>>> much
>>>>>> as
>>>>>>>> possible. Only data that is not consumed timely should be
>> spilled
>>>> to
>>>>>>> disk.
>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
>>> your
>>>>>>>> feedback.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>>
>>>>>>>>>> Weijie
>>>>>>>>>>


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by weijie guo <gu...@gmail.com>.
Hi All,
Thank you for your attention and feedback.
Do you have any other comments? If there are no other questions, I'll vote
on FLIP-235 tomorrow.

Best regards,

Weijie


Aitozi <gj...@gmail.com> 于2022年5月20日周五 13:22写道:

> Hi Xintong
>     Thanks for your detailed explanation, I misunderstand the spill
> behavior at first glance,
> I get your point now. I think it will be a good addition to the current
> execution mode.
> Looking forward to it :)
>
> Best,
> Aitozi
>
> Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:
>
> > Hi Aitozi,
> >
> > In which case we can use the hybrid shuffle mode
> >
> > Just to directly answer this question, in addition to
> > Weijie's explanations. For batch workload, if you want the workload to
> take
> > advantage of as many resources as available, which ranges from a single
> > slot to as many slots as the total tasks, you may consider hybrid shuffle
> > mode. Admittedly, this may not always be wanted, e.g., users may not want
> > to execute a job if there's too few resources available, or may not want
> a
> > job taking too many of the cluster resources. That's why we propose
> hybrid
> > shuffle as an additional option for batch users, rather than a
> replacement
> > for Pipelined or Blocking mode.
> >
> > So you mean the hybrid shuffle mode will limit its usage to the bounded
> > > source, Right ?
> > >
> > Yes.
> >
> > One more question, with the bounded data and partly of the stage is
> running
> > > in the Pipelined shuffle mode, what will be the behavior of the task
> > > failure, Is the checkpoint enabled for these running stages or will it
> > > re-run after the failure?
> > >
> > There's no checkpoints. The failover behavior depends on the spilling
> > strategy.
> > - In the first version, we only consider a selective spilling strategy,
> > which means spill data as little as possible to the disk, which means in
> > case of failover upstream tasks need to be restarted to reproduce the
> > complete intermediate results.
> > - An alternative strategy we may introduce in future if needed is to
> spill
> > the complete intermediate results. That avoids restarting upstream tasks
> in
> > case of failover, because the produced intermediate results can be
> > re-consumed, at the cost of more disk IO load.
> > With both strategies, the trade-off between failover cost and IO load is
> > for the user to decide. This is also discussed in the MemoryDataManager
> > section of the FLIP.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com> wrote:
> >
> > > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> > > limit
> > > its usage to the bounded source, Right ?
> > > One more question, with the bounded data and partly of the stage is
> > running
> > > in the Pipelined shuffle mode, what will be the behavior of the task
> > > failure, Is the
> > > checkpoint enabled for these running stages or will it re-run after the
> > > failure?
> > >
> > > Best,
> > > Aitozi
> > >
> > > weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
> > >
> > > > Hi, Aitozi:
> > > >
> > > > Thank you for the feedback!
> > > > Here are some of my thoughts on your question
> > > >
> > > > >>> 1.If there is an unbounded data source, but only have resource to
> > > > schedule the first stage, will it bring the big burden to the
> > > disk/shuffle
> > > > service which will occupy all the resource I think.
> > > > First of all, Hybrid Shuffle Mode is oriented to the batch job
> > scenario,
> > > so
> > > > there is no problem of unbounded data sources. Secondly, if you
> > consider
> > > > the stream scenario, I think Pipelined Shuffle should still be the
> best
> > > > choice at present. For an unbounded data stream, it is not meaningful
> > to
> > > > only run some stages.
> > > >
> > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode.
> In
> > > > other words, In which case we can use the hybrid shuffle mode:
> > > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > > > shuffle mode can effectively utilize cluster resources and avoid some
> > > > unnecessary disk IO overhead. For OLAP scenarios, which are
> > characterized
> > > > by a large number of concurrently submitted short batch jobs, hybrid
> > > > shuffle can solve the scheduling deadlock problem of pipelined
> shuffle
> > > and
> > > > achieve similar performance.
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
> > > >
> > > > > Hi Weijie:
> > > > >
> > > > >      Thanks for the nice FLIP, I have couple questions about this:
> > > > >
> > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > > > resource.
> > > > > If there
> > > > > is an unbounded data source, but only have resource to schedule the
> > > first
> > > > > stage, will it
> > > > > bring the big burden to the disk/shuffle service which will occupy
> > all
> > > > the
> > > > > resource I think.
> > > > >
> > > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> > > other
> > > > > words, In which
> > > > > case we can use the hybrid shuffle mode:
> > > > > - For batch job want to use more resource to reduce the e2e time ?
> > > > > - Or for streaming job which may lack of resource temporarily ?
> > > > > - Or for OLAP job which will try to make best use of available
> > > resources
> > > > as
> > > > > you mentioned to finish the query?
> > > > > Just want to know the typical use case for the Hybrid shuffle mode
> :)
> > > > >
> > > > >
> > > > > Best,
> > > > > Aitozi.
> > > > >
> > > > > weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
> > > > >
> > > > > > Yangze, Thank you for the feedback!
> > > > > > Here's my thoughts for your questions:
> > > > > >
> > > > > > >>> How do we decide the size of the buffer pool in
> > MemoryDataManager
> > > > and
> > > > > > the read buffers in FileDataManager?
> > > > > > The BufferPool in MemoryDataManager is the LocalBufferPool used
> by
> > > > > > ResultPartition, and the size is the same as the current
> > > implementation
> > > > > of
> > > > > > sort-merge shuffle. In other words, the minimum value of
> BufferPool
> > > is
> > > > a
> > > > > > configurable fixed value, and the maximum value is Math.max(min,
> 4
> > *
> > > > > > numSubpartitions). The default value can be determined by running
> > the
> > > > > > TPC-DS tests.
> > > > > > Read buffers in FileDataManager are requested from the
> > > > > > BatchShuffleReadBufferPool shared by TaskManager, it's size
> > > controlled
> > > > by
> > > > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> > > default
> > > > > > value is 32M, which is consistent with the current sort-merge
> > shuffle
> > > > > > logic.
> > > > > >
> > > > > > >>> Is there an upper limit for the sum of them? If there is, how
> > > does
> > > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > > The buffers of the MemoryDataManager are limited by the size of
> the
> > > > > > LocalBufferPool, and the upper limit is the size of the Network
> > > Memory.
> > > > > The
> > > > > > buffers of the FileDataManager are directly requested from
> > > > > > UnpooledOffHeapMemory, and are also limited by the size of the
> > > > framework
> > > > > > off-heap memory. I think there should be no need for additional
> > > > > > synchronization mechanisms.
> > > > > >
> > > > > > >>> How do you disable the slot sharing? If user configures both
> > the
> > > > slot
> > > > > > sharing group and hybrid shuffle, what will happen to that job?
> > > > > > I think we can print a warning log when Hybrid Shuffle is enabled
> > and
> > > > SSG
> > > > > > is configured during the JobGraph compilation stage, and fallback
> > to
> > > > the
> > > > > > region slot sharing group by default. Of course, it will be
> > > emphasized
> > > > in
> > > > > > the document that we do not currently support SSG, If configured,
> > it
> > > > will
> > > > > > fall back to the default.
> > > > > >
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Weijie
> > > > > >
> > > > > >
> > > > > > Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> > > > > >
> > > > > > > Thanks for driving this. Xintong and Weijie.
> > > > > > >
> > > > > > > I believe this feature will make Flink a better batch/OLAP
> > engine.
> > > +1
> > > > > > > for the overall design.
> > > > > > >
> > > > > > > Some questions:
> > > > > > > 1. How do we decide the size of the buffer pool in
> > > MemoryDataManager
> > > > > > > and the read buffers in FileDataManager?
> > > > > > > 2. Is there an upper limit for the sum of them? If there is,
> how
> > > does
> > > > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > > > 3. How do you disable the slot sharing? If user configures both
> > the
> > > > > > > slot sharing group and hybrid shuffle, what will happen to that
> > > job?
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> > > tonysong820@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks for preparing this FLIP, Weijie.
> > > > > > > >
> > > > > > > > I think this is a good improvement on batch resource
> > elasticity.
> > > > > > Looking
> > > > > > > > forward to the community feedback.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Xintong
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo <
> > > > > guoweijiereswqa@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I’d like to start a discussion about FLIP-235[1], which
> > > > introduce a
> > > > > > > new shuffle mode
> > > > > > > > >  can overcome some of the problems of Pipelined Shuffle and
> > > > > Blocking
> > > > > > > Shuffle in batch scenarios.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Currently in Flink, task scheduling is more or less
> > constrained
> > > > by
> > > > > > the
> > > > > > > shuffle implementations.
> > > > > > > > > This will bring the following disadvantages:
> > > > > > > > >
> > > > > > > > >    1. Pipelined Shuffle:
> > > > > > > > >     For pipelined shuffle, the upstream and downstream
> tasks
> > > are
> > > > > > > required to be deployed at the same time, to avoid upstream
> tasks
> > > > being
> > > > > > > blocked forever. This is fine when there are enough resources
> for
> > > > both
> > > > > > > upstream and downstream tasks to run simultaneously, but will
> > cause
> > > > the
> > > > > > > following problems otherwise:
> > > > > > > > >    1.
> > > > > > > > >       Pipelined shuffle connected tasks (i.e., a pipelined
> > > > region)
> > > > > > > cannot be executed until obtaining resources for all of them,
> > > > resulting
> > > > > > in
> > > > > > > longer job finishing time and poorer resource efficiency due to
> > > > holding
> > > > > > > part of the resources idle while waiting for the rest.
> > > > > > > > >       2.
> > > > > > > > >       More severely, if multiple jobs each hold part of the
> > > > cluster
> > > > > > > resources and are waiting for more, a deadlock would occur. The
> > > > chance
> > > > > is
> > > > > > > not trivial, especially for scenarios such as OLAP where
> > concurrent
> > > > job
> > > > > > > submissions are frequent.
> > > > > > > > >       2. Blocking Shuffle:
> > > > > > > > >     For blocking shuffle, execution of downstream tasks
> must
> > > wait
> > > > > for
> > > > > > > all upstream tasks to finish, despite there might be more
> > resources
> > > > > > > available. The sequential execution of upstream and downstream
> > > tasks
> > > > > > > significantly increase the job finishing time, and the disk IO
> > > > workload
> > > > > > for
> > > > > > > spilling and loading full intermediate data also affects the
> > > > > performance.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > We believe the root cause of the above problems is that
> > shuffle
> > > > > > > implementations put unnecessary constraints on task scheduling.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > To solve this problem, Xintong Song and I propose to
> > introduce
> > > > > hybrid
> > > > > > > shuffle to minimize the scheduling constraints. With Hybrid
> > > Shuffle,
> > > > > > Flink
> > > > > > > should:
> > > > > > > > >
> > > > > > > > >    1. Make best use of available resources.
> > > > > > > > >     Ideally, we want Flink to always make progress if
> > possible.
> > > > > That
> > > > > > > is to say, it should always execute a pending task if there are
> > > > > resources
> > > > > > > available for that task.
> > > > > > > > >    2. Minimize disk IO load.
> > > > > > > > >     In-flight data should be consumed directly from memory
> as
> > > > much
> > > > > as
> > > > > > > possible. Only data that is not consumed timely should be
> spilled
> > > to
> > > > > > disk.
> > > > > > > > >
> > > > > > > > > You can find more details in FLIP-235. Looking forward to
> > your
> > > > > > > feedback.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > >
> > > > > > > > > Weijie
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Aitozi <gj...@gmail.com>.
Hi Xintong
    Thanks for your detailed explanation, I misunderstand the spill
behavior at first glance,
I get your point now. I think it will be a good addition to the current
execution mode.
Looking forward to it :)

Best,
Aitozi

Xintong Song <to...@gmail.com> 于2022年5月20日周五 12:26写道:

> Hi Aitozi,
>
> In which case we can use the hybrid shuffle mode
>
> Just to directly answer this question, in addition to
> Weijie's explanations. For batch workload, if you want the workload to take
> advantage of as many resources as available, which ranges from a single
> slot to as many slots as the total tasks, you may consider hybrid shuffle
> mode. Admittedly, this may not always be wanted, e.g., users may not want
> to execute a job if there's too few resources available, or may not want a
> job taking too many of the cluster resources. That's why we propose hybrid
> shuffle as an additional option for batch users, rather than a replacement
> for Pipelined or Blocking mode.
>
> So you mean the hybrid shuffle mode will limit its usage to the bounded
> > source, Right ?
> >
> Yes.
>
> One more question, with the bounded data and partly of the stage is running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the checkpoint enabled for these running stages or will it
> > re-run after the failure?
> >
> There's no checkpoints. The failover behavior depends on the spilling
> strategy.
> - In the first version, we only consider a selective spilling strategy,
> which means spill data as little as possible to the disk, which means in
> case of failover upstream tasks need to be restarted to reproduce the
> complete intermediate results.
> - An alternative strategy we may introduce in future if needed is to spill
> the complete intermediate results. That avoids restarting upstream tasks in
> case of failover, because the produced intermediate results can be
> re-consumed, at the cost of more disk IO load.
> With both strategies, the trade-off between failover cost and IO load is
> for the user to decide. This is also discussed in the MemoryDataManager
> section of the FLIP.
>
> Best,
>
> Xintong
>
>
>
> On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com> wrote:
>
> > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> > limit
> > its usage to the bounded source, Right ?
> > One more question, with the bounded data and partly of the stage is
> running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the
> > checkpoint enabled for these running stages or will it re-run after the
> > failure?
> >
> > Best,
> > Aitozi
> >
> > weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
> >
> > > Hi, Aitozi:
> > >
> > > Thank you for the feedback!
> > > Here are some of my thoughts on your question
> > >
> > > >>> 1.If there is an unbounded data source, but only have resource to
> > > schedule the first stage, will it bring the big burden to the
> > disk/shuffle
> > > service which will occupy all the resource I think.
> > > First of all, Hybrid Shuffle Mode is oriented to the batch job
> scenario,
> > so
> > > there is no problem of unbounded data sources. Secondly, if you
> consider
> > > the stream scenario, I think Pipelined Shuffle should still be the best
> > > choice at present. For an unbounded data stream, it is not meaningful
> to
> > > only run some stages.
> > >
> > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> > > other words, In which case we can use the hybrid shuffle mode:
> > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > > shuffle mode can effectively utilize cluster resources and avoid some
> > > unnecessary disk IO overhead. For OLAP scenarios, which are
> characterized
> > > by a large number of concurrently submitted short batch jobs, hybrid
> > > shuffle can solve the scheduling deadlock problem of pipelined shuffle
> > and
> > > achieve similar performance.
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
> > >
> > > > Hi Weijie:
> > > >
> > > >      Thanks for the nice FLIP, I have couple questions about this:
> > > >
> > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > > resource.
> > > > If there
> > > > is an unbounded data source, but only have resource to schedule the
> > first
> > > > stage, will it
> > > > bring the big burden to the disk/shuffle service which will occupy
> all
> > > the
> > > > resource I think.
> > > >
> > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> > other
> > > > words, In which
> > > > case we can use the hybrid shuffle mode:
> > > > - For batch job want to use more resource to reduce the e2e time ?
> > > > - Or for streaming job which may lack of resource temporarily ?
> > > > - Or for OLAP job which will try to make best use of available
> > resources
> > > as
> > > > you mentioned to finish the query?
> > > > Just want to know the typical use case for the Hybrid shuffle mode :)
> > > >
> > > >
> > > > Best,
> > > > Aitozi.
> > > >
> > > > weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
> > > >
> > > > > Yangze, Thank you for the feedback!
> > > > > Here's my thoughts for your questions:
> > > > >
> > > > > >>> How do we decide the size of the buffer pool in
> MemoryDataManager
> > > and
> > > > > the read buffers in FileDataManager?
> > > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > > > ResultPartition, and the size is the same as the current
> > implementation
> > > > of
> > > > > sort-merge shuffle. In other words, the minimum value of BufferPool
> > is
> > > a
> > > > > configurable fixed value, and the maximum value is Math.max(min, 4
> *
> > > > > numSubpartitions). The default value can be determined by running
> the
> > > > > TPC-DS tests.
> > > > > Read buffers in FileDataManager are requested from the
> > > > > BatchShuffleReadBufferPool shared by TaskManager, it's size
> > controlled
> > > by
> > > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> > default
> > > > > value is 32M, which is consistent with the current sort-merge
> shuffle
> > > > > logic.
> > > > >
> > > > > >>> Is there an upper limit for the sum of them? If there is, how
> > does
> > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > The buffers of the MemoryDataManager are limited by the size of the
> > > > > LocalBufferPool, and the upper limit is the size of the Network
> > Memory.
> > > > The
> > > > > buffers of the FileDataManager are directly requested from
> > > > > UnpooledOffHeapMemory, and are also limited by the size of the
> > > framework
> > > > > off-heap memory. I think there should be no need for additional
> > > > > synchronization mechanisms.
> > > > >
> > > > > >>> How do you disable the slot sharing? If user configures both
> the
> > > slot
> > > > > sharing group and hybrid shuffle, what will happen to that job?
> > > > > I think we can print a warning log when Hybrid Shuffle is enabled
> and
> > > SSG
> > > > > is configured during the JobGraph compilation stage, and fallback
> to
> > > the
> > > > > region slot sharing group by default. Of course, it will be
> > emphasized
> > > in
> > > > > the document that we do not currently support SSG, If configured,
> it
> > > will
> > > > > fall back to the default.
> > > > >
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > > > >
> > > > > Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> > > > >
> > > > > > Thanks for driving this. Xintong and Weijie.
> > > > > >
> > > > > > I believe this feature will make Flink a better batch/OLAP
> engine.
> > +1
> > > > > > for the overall design.
> > > > > >
> > > > > > Some questions:
> > > > > > 1. How do we decide the size of the buffer pool in
> > MemoryDataManager
> > > > > > and the read buffers in FileDataManager?
> > > > > > 2. Is there an upper limit for the sum of them? If there is, how
> > does
> > > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > > 3. How do you disable the slot sharing? If user configures both
> the
> > > > > > slot sharing group and hybrid shuffle, what will happen to that
> > job?
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> > tonysong820@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Thanks for preparing this FLIP, Weijie.
> > > > > > >
> > > > > > > I think this is a good improvement on batch resource
> elasticity.
> > > > > Looking
> > > > > > > forward to the community feedback.
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Xintong
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo <
> > > > guoweijiereswqa@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > >
> > > > > > > > I’d like to start a discussion about FLIP-235[1], which
> > > introduce a
> > > > > > new shuffle mode
> > > > > > > >  can overcome some of the problems of Pipelined Shuffle and
> > > > Blocking
> > > > > > Shuffle in batch scenarios.
> > > > > > > >
> > > > > > > >
> > > > > > > > Currently in Flink, task scheduling is more or less
> constrained
> > > by
> > > > > the
> > > > > > shuffle implementations.
> > > > > > > > This will bring the following disadvantages:
> > > > > > > >
> > > > > > > >    1. Pipelined Shuffle:
> > > > > > > >     For pipelined shuffle, the upstream and downstream tasks
> > are
> > > > > > required to be deployed at the same time, to avoid upstream tasks
> > > being
> > > > > > blocked forever. This is fine when there are enough resources for
> > > both
> > > > > > upstream and downstream tasks to run simultaneously, but will
> cause
> > > the
> > > > > > following problems otherwise:
> > > > > > > >    1.
> > > > > > > >       Pipelined shuffle connected tasks (i.e., a pipelined
> > > region)
> > > > > > cannot be executed until obtaining resources for all of them,
> > > resulting
> > > > > in
> > > > > > longer job finishing time and poorer resource efficiency due to
> > > holding
> > > > > > part of the resources idle while waiting for the rest.
> > > > > > > >       2.
> > > > > > > >       More severely, if multiple jobs each hold part of the
> > > cluster
> > > > > > resources and are waiting for more, a deadlock would occur. The
> > > chance
> > > > is
> > > > > > not trivial, especially for scenarios such as OLAP where
> concurrent
> > > job
> > > > > > submissions are frequent.
> > > > > > > >       2. Blocking Shuffle:
> > > > > > > >     For blocking shuffle, execution of downstream tasks must
> > wait
> > > > for
> > > > > > all upstream tasks to finish, despite there might be more
> resources
> > > > > > available. The sequential execution of upstream and downstream
> > tasks
> > > > > > significantly increase the job finishing time, and the disk IO
> > > workload
> > > > > for
> > > > > > spilling and loading full intermediate data also affects the
> > > > performance.
> > > > > > > >
> > > > > > > >
> > > > > > > > We believe the root cause of the above problems is that
> shuffle
> > > > > > implementations put unnecessary constraints on task scheduling.
> > > > > > > >
> > > > > > > >
> > > > > > > > To solve this problem, Xintong Song and I propose to
> introduce
> > > > hybrid
> > > > > > shuffle to minimize the scheduling constraints. With Hybrid
> > Shuffle,
> > > > > Flink
> > > > > > should:
> > > > > > > >
> > > > > > > >    1. Make best use of available resources.
> > > > > > > >     Ideally, we want Flink to always make progress if
> possible.
> > > > That
> > > > > > is to say, it should always execute a pending task if there are
> > > > resources
> > > > > > available for that task.
> > > > > > > >    2. Minimize disk IO load.
> > > > > > > >     In-flight data should be consumed directly from memory as
> > > much
> > > > as
> > > > > > possible. Only data that is not consumed timely should be spilled
> > to
> > > > > disk.
> > > > > > > >
> > > > > > > > You can find more details in FLIP-235. Looking forward to
> your
> > > > > > feedback.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Weijie
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Xintong Song <to...@gmail.com>.
Hi Aitozi,

In which case we can use the hybrid shuffle mode

Just to directly answer this question, in addition to
Weijie's explanations. For batch workload, if you want the workload to take
advantage of as many resources as available, which ranges from a single
slot to as many slots as the total tasks, you may consider hybrid shuffle
mode. Admittedly, this may not always be wanted, e.g., users may not want
to execute a job if there's too few resources available, or may not want a
job taking too many of the cluster resources. That's why we propose hybrid
shuffle as an additional option for batch users, rather than a replacement
for Pipelined or Blocking mode.

So you mean the hybrid shuffle mode will limit its usage to the bounded
> source, Right ?
>
Yes.

One more question, with the bounded data and partly of the stage is running
> in the Pipelined shuffle mode, what will be the behavior of the task
> failure, Is the checkpoint enabled for these running stages or will it
> re-run after the failure?
>
There's no checkpoints. The failover behavior depends on the spilling
strategy.
- In the first version, we only consider a selective spilling strategy,
which means spill data as little as possible to the disk, which means in
case of failover upstream tasks need to be restarted to reproduce the
complete intermediate results.
- An alternative strategy we may introduce in future if needed is to spill
the complete intermediate results. That avoids restarting upstream tasks in
case of failover, because the produced intermediate results can be
re-consumed, at the cost of more disk IO load.
With both strategies, the trade-off between failover cost and IO load is
for the user to decide. This is also discussed in the MemoryDataManager
section of the FLIP.

Best,

Xintong



On Fri, May 20, 2022 at 12:10 PM Aitozi <gj...@gmail.com> wrote:

> Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> limit
> its usage to the bounded source, Right ?
> One more question, with the bounded data and partly of the stage is running
> in the Pipelined shuffle mode, what will be the behavior of the task
> failure, Is the
> checkpoint enabled for these running stages or will it re-run after the
> failure?
>
> Best,
> Aitozi
>
> weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:
>
> > Hi, Aitozi:
> >
> > Thank you for the feedback!
> > Here are some of my thoughts on your question
> >
> > >>> 1.If there is an unbounded data source, but only have resource to
> > schedule the first stage, will it bring the big burden to the
> disk/shuffle
> > service which will occupy all the resource I think.
> > First of all, Hybrid Shuffle Mode is oriented to the batch job scenario,
> so
> > there is no problem of unbounded data sources. Secondly, if you consider
> > the stream scenario, I think Pipelined Shuffle should still be the best
> > choice at present. For an unbounded data stream, it is not meaningful to
> > only run some stages.
> >
> > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> > other words, In which case we can use the hybrid shuffle mode:
> > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > shuffle mode can effectively utilize cluster resources and avoid some
> > unnecessary disk IO overhead. For OLAP scenarios, which are characterized
> > by a large number of concurrently submitted short batch jobs, hybrid
> > shuffle can solve the scheduling deadlock problem of pipelined shuffle
> and
> > achieve similar performance.
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
> >
> > > Hi Weijie:
> > >
> > >      Thanks for the nice FLIP, I have couple questions about this:
> > >
> > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > resource.
> > > If there
> > > is an unbounded data source, but only have resource to schedule the
> first
> > > stage, will it
> > > bring the big burden to the disk/shuffle service which will occupy all
> > the
> > > resource I think.
> > >
> > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> other
> > > words, In which
> > > case we can use the hybrid shuffle mode:
> > > - For batch job want to use more resource to reduce the e2e time ?
> > > - Or for streaming job which may lack of resource temporarily ?
> > > - Or for OLAP job which will try to make best use of available
> resources
> > as
> > > you mentioned to finish the query?
> > > Just want to know the typical use case for the Hybrid shuffle mode :)
> > >
> > >
> > > Best,
> > > Aitozi.
> > >
> > > weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
> > >
> > > > Yangze, Thank you for the feedback!
> > > > Here's my thoughts for your questions:
> > > >
> > > > >>> How do we decide the size of the buffer pool in MemoryDataManager
> > and
> > > > the read buffers in FileDataManager?
> > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > > ResultPartition, and the size is the same as the current
> implementation
> > > of
> > > > sort-merge shuffle. In other words, the minimum value of BufferPool
> is
> > a
> > > > configurable fixed value, and the maximum value is Math.max(min, 4 *
> > > > numSubpartitions). The default value can be determined by running the
> > > > TPC-DS tests.
> > > > Read buffers in FileDataManager are requested from the
> > > > BatchShuffleReadBufferPool shared by TaskManager, it's size
> controlled
> > by
> > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> default
> > > > value is 32M, which is consistent with the current sort-merge shuffle
> > > > logic.
> > > >
> > > > >>> Is there an upper limit for the sum of them? If there is, how
> does
> > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > The buffers of the MemoryDataManager are limited by the size of the
> > > > LocalBufferPool, and the upper limit is the size of the Network
> Memory.
> > > The
> > > > buffers of the FileDataManager are directly requested from
> > > > UnpooledOffHeapMemory, and are also limited by the size of the
> > framework
> > > > off-heap memory. I think there should be no need for additional
> > > > synchronization mechanisms.
> > > >
> > > > >>> How do you disable the slot sharing? If user configures both the
> > slot
> > > > sharing group and hybrid shuffle, what will happen to that job?
> > > > I think we can print a warning log when Hybrid Shuffle is enabled and
> > SSG
> > > > is configured during the JobGraph compilation stage, and fallback to
> > the
> > > > region slot sharing group by default. Of course, it will be
> emphasized
> > in
> > > > the document that we do not currently support SSG, If configured, it
> > will
> > > > fall back to the default.
> > > >
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> > > >
> > > > > Thanks for driving this. Xintong and Weijie.
> > > > >
> > > > > I believe this feature will make Flink a better batch/OLAP engine.
> +1
> > > > > for the overall design.
> > > > >
> > > > > Some questions:
> > > > > 1. How do we decide the size of the buffer pool in
> MemoryDataManager
> > > > > and the read buffers in FileDataManager?
> > > > > 2. Is there an upper limit for the sum of them? If there is, how
> does
> > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > 3. How do you disable the slot sharing? If user configures both the
> > > > > slot sharing group and hybrid shuffle, what will happen to that
> job?
> > > > >
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> tonysong820@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Thanks for preparing this FLIP, Weijie.
> > > > > >
> > > > > > I think this is a good improvement on batch resource elasticity.
> > > > Looking
> > > > > > forward to the community feedback.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Xintong
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo <
> > > guoweijiereswqa@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > >
> > > > > > > I’d like to start a discussion about FLIP-235[1], which
> > introduce a
> > > > > new shuffle mode
> > > > > > >  can overcome some of the problems of Pipelined Shuffle and
> > > Blocking
> > > > > Shuffle in batch scenarios.
> > > > > > >
> > > > > > >
> > > > > > > Currently in Flink, task scheduling is more or less constrained
> > by
> > > > the
> > > > > shuffle implementations.
> > > > > > > This will bring the following disadvantages:
> > > > > > >
> > > > > > >    1. Pipelined Shuffle:
> > > > > > >     For pipelined shuffle, the upstream and downstream tasks
> are
> > > > > required to be deployed at the same time, to avoid upstream tasks
> > being
> > > > > blocked forever. This is fine when there are enough resources for
> > both
> > > > > upstream and downstream tasks to run simultaneously, but will cause
> > the
> > > > > following problems otherwise:
> > > > > > >    1.
> > > > > > >       Pipelined shuffle connected tasks (i.e., a pipelined
> > region)
> > > > > cannot be executed until obtaining resources for all of them,
> > resulting
> > > > in
> > > > > longer job finishing time and poorer resource efficiency due to
> > holding
> > > > > part of the resources idle while waiting for the rest.
> > > > > > >       2.
> > > > > > >       More severely, if multiple jobs each hold part of the
> > cluster
> > > > > resources and are waiting for more, a deadlock would occur. The
> > chance
> > > is
> > > > > not trivial, especially for scenarios such as OLAP where concurrent
> > job
> > > > > submissions are frequent.
> > > > > > >       2. Blocking Shuffle:
> > > > > > >     For blocking shuffle, execution of downstream tasks must
> wait
> > > for
> > > > > all upstream tasks to finish, despite there might be more resources
> > > > > available. The sequential execution of upstream and downstream
> tasks
> > > > > significantly increase the job finishing time, and the disk IO
> > workload
> > > > for
> > > > > spilling and loading full intermediate data also affects the
> > > performance.
> > > > > > >
> > > > > > >
> > > > > > > We believe the root cause of the above problems is that shuffle
> > > > > implementations put unnecessary constraints on task scheduling.
> > > > > > >
> > > > > > >
> > > > > > > To solve this problem, Xintong Song and I propose to introduce
> > > hybrid
> > > > > shuffle to minimize the scheduling constraints. With Hybrid
> Shuffle,
> > > > Flink
> > > > > should:
> > > > > > >
> > > > > > >    1. Make best use of available resources.
> > > > > > >     Ideally, we want Flink to always make progress if possible.
> > > That
> > > > > is to say, it should always execute a pending task if there are
> > > resources
> > > > > available for that task.
> > > > > > >    2. Minimize disk IO load.
> > > > > > >     In-flight data should be consumed directly from memory as
> > much
> > > as
> > > > > possible. Only data that is not consumed timely should be spilled
> to
> > > > disk.
> > > > > > >
> > > > > > > You can find more details in FLIP-235. Looking forward to your
> > > > > feedback.
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Weijie
> > > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Aitozi <gj...@gmail.com>.
Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
limit
its usage to the bounded source, Right ?
One more question, with the bounded data and partly of the stage is running
in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the
checkpoint enabled for these running stages or will it re-run after the
failure?

Best,
Aitozi

weijie guo <gu...@gmail.com> 于2022年5月20日周五 10:45写道:

> Hi, Aitozi:
>
> Thank you for the feedback!
> Here are some of my thoughts on your question
>
> >>> 1.If there is an unbounded data source, but only have resource to
> schedule the first stage, will it bring the big burden to the disk/shuffle
> service which will occupy all the resource I think.
> First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so
> there is no problem of unbounded data sources. Secondly, if you consider
> the stream scenario, I think Pipelined Shuffle should still be the best
> choice at present. For an unbounded data stream, it is not meaningful to
> only run some stages.
>
> >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> other words, In which case we can use the hybrid shuffle mode:
> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> shuffle mode can effectively utilize cluster resources and avoid some
> unnecessary disk IO overhead. For OLAP scenarios, which are characterized
> by a large number of concurrently submitted short batch jobs, hybrid
> shuffle can solve the scheduling deadlock problem of pipelined shuffle and
> achieve similar performance.
>
> Best regards,
>
> Weijie
>
>
> Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:
>
> > Hi Weijie:
> >
> >      Thanks for the nice FLIP, I have couple questions about this:
> >
> > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> resource.
> > If there
> > is an unbounded data source, but only have resource to schedule the first
> > stage, will it
> > bring the big burden to the disk/shuffle service which will occupy all
> the
> > resource I think.
> >
> > 2) Which kind of job will benefit from the hybrid shuffle mode. In other
> > words, In which
> > case we can use the hybrid shuffle mode:
> > - For batch job want to use more resource to reduce the e2e time ?
> > - Or for streaming job which may lack of resource temporarily ?
> > - Or for OLAP job which will try to make best use of available resources
> as
> > you mentioned to finish the query?
> > Just want to know the typical use case for the Hybrid shuffle mode :)
> >
> >
> > Best,
> > Aitozi.
> >
> > weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
> >
> > > Yangze, Thank you for the feedback!
> > > Here's my thoughts for your questions:
> > >
> > > >>> How do we decide the size of the buffer pool in MemoryDataManager
> and
> > > the read buffers in FileDataManager?
> > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > ResultPartition, and the size is the same as the current implementation
> > of
> > > sort-merge shuffle. In other words, the minimum value of BufferPool is
> a
> > > configurable fixed value, and the maximum value is Math.max(min, 4 *
> > > numSubpartitions). The default value can be determined by running the
> > > TPC-DS tests.
> > > Read buffers in FileDataManager are requested from the
> > > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled
> by
> > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> > > value is 32M, which is consistent with the current sort-merge shuffle
> > > logic.
> > >
> > > >>> Is there an upper limit for the sum of them? If there is, how does
> > > MemoryDataManager and FileDataManager sync the memory usage?
> > > The buffers of the MemoryDataManager are limited by the size of the
> > > LocalBufferPool, and the upper limit is the size of the Network Memory.
> > The
> > > buffers of the FileDataManager are directly requested from
> > > UnpooledOffHeapMemory, and are also limited by the size of the
> framework
> > > off-heap memory. I think there should be no need for additional
> > > synchronization mechanisms.
> > >
> > > >>> How do you disable the slot sharing? If user configures both the
> slot
> > > sharing group and hybrid shuffle, what will happen to that job?
> > > I think we can print a warning log when Hybrid Shuffle is enabled and
> SSG
> > > is configured during the JobGraph compilation stage, and fallback to
> the
> > > region slot sharing group by default. Of course, it will be emphasized
> in
> > > the document that we do not currently support SSG, If configured, it
> will
> > > fall back to the default.
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> > >
> > > > Thanks for driving this. Xintong and Weijie.
> > > >
> > > > I believe this feature will make Flink a better batch/OLAP engine. +1
> > > > for the overall design.
> > > >
> > > > Some questions:
> > > > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > > > and the read buffers in FileDataManager?
> > > > 2. Is there an upper limit for the sum of them? If there is, how does
> > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > 3. How do you disable the slot sharing? If user configures both the
> > > > slot sharing group and hybrid shuffle, what will happen to that job?
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song <to...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Thanks for preparing this FLIP, Weijie.
> > > > >
> > > > > I think this is a good improvement on batch resource elasticity.
> > > Looking
> > > > > forward to the community feedback.
> > > > >
> > > > > Best,
> > > > >
> > > > > Xintong
> > > > >
> > > > >
> > > > >
> > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo <
> > guoweijiereswqa@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > >
> > > > > > I’d like to start a discussion about FLIP-235[1], which
> introduce a
> > > > new shuffle mode
> > > > > >  can overcome some of the problems of Pipelined Shuffle and
> > Blocking
> > > > Shuffle in batch scenarios.
> > > > > >
> > > > > >
> > > > > > Currently in Flink, task scheduling is more or less constrained
> by
> > > the
> > > > shuffle implementations.
> > > > > > This will bring the following disadvantages:
> > > > > >
> > > > > >    1. Pipelined Shuffle:
> > > > > >     For pipelined shuffle, the upstream and downstream tasks are
> > > > required to be deployed at the same time, to avoid upstream tasks
> being
> > > > blocked forever. This is fine when there are enough resources for
> both
> > > > upstream and downstream tasks to run simultaneously, but will cause
> the
> > > > following problems otherwise:
> > > > > >    1.
> > > > > >       Pipelined shuffle connected tasks (i.e., a pipelined
> region)
> > > > cannot be executed until obtaining resources for all of them,
> resulting
> > > in
> > > > longer job finishing time and poorer resource efficiency due to
> holding
> > > > part of the resources idle while waiting for the rest.
> > > > > >       2.
> > > > > >       More severely, if multiple jobs each hold part of the
> cluster
> > > > resources and are waiting for more, a deadlock would occur. The
> chance
> > is
> > > > not trivial, especially for scenarios such as OLAP where concurrent
> job
> > > > submissions are frequent.
> > > > > >       2. Blocking Shuffle:
> > > > > >     For blocking shuffle, execution of downstream tasks must wait
> > for
> > > > all upstream tasks to finish, despite there might be more resources
> > > > available. The sequential execution of upstream and downstream tasks
> > > > significantly increase the job finishing time, and the disk IO
> workload
> > > for
> > > > spilling and loading full intermediate data also affects the
> > performance.
> > > > > >
> > > > > >
> > > > > > We believe the root cause of the above problems is that shuffle
> > > > implementations put unnecessary constraints on task scheduling.
> > > > > >
> > > > > >
> > > > > > To solve this problem, Xintong Song and I propose to introduce
> > hybrid
> > > > shuffle to minimize the scheduling constraints. With Hybrid Shuffle,
> > > Flink
> > > > should:
> > > > > >
> > > > > >    1. Make best use of available resources.
> > > > > >     Ideally, we want Flink to always make progress if possible.
> > That
> > > > is to say, it should always execute a pending task if there are
> > resources
> > > > available for that task.
> > > > > >    2. Minimize disk IO load.
> > > > > >     In-flight data should be consumed directly from memory as
> much
> > as
> > > > possible. Only data that is not consumed timely should be spilled to
> > > disk.
> > > > > >
> > > > > > You can find more details in FLIP-235. Looking forward to your
> > > > feedback.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > > >
> > > > > >
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Weijie
> > > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by weijie guo <gu...@gmail.com>.
Hi, Aitozi:

Thank you for the feedback!
Here are some of my thoughts on your question

>>> 1.If there is an unbounded data source, but only have resource to
schedule the first stage, will it bring the big burden to the disk/shuffle
service which will occupy all the resource I think.
First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so
there is no problem of unbounded data sources. Secondly, if you consider
the stream scenario, I think Pipelined Shuffle should still be the best
choice at present. For an unbounded data stream, it is not meaningful to
only run some stages.

>>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
other words, In which case we can use the hybrid shuffle mode:
Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
shuffle mode can effectively utilize cluster resources and avoid some
unnecessary disk IO overhead. For OLAP scenarios, which are characterized
by a large number of concurrently submitted short batch jobs, hybrid
shuffle can solve the scheduling deadlock problem of pipelined shuffle and
achieve similar performance.

Best regards,

Weijie


Aitozi <gj...@gmail.com> 于2022年5月20日周五 08:05写道:

> Hi Weijie:
>
>      Thanks for the nice FLIP, I have couple questions about this:
>
> 1) In the hybrid shuffle mode, the shuffle mode is decided by the resource.
> If there
> is an unbounded data source, but only have resource to schedule the first
> stage, will it
> bring the big burden to the disk/shuffle service which will occupy all the
> resource I think.
>
> 2) Which kind of job will benefit from the hybrid shuffle mode. In other
> words, In which
> case we can use the hybrid shuffle mode:
> - For batch job want to use more resource to reduce the e2e time ?
> - Or for streaming job which may lack of resource temporarily ?
> - Or for OLAP job which will try to make best use of available resources as
> you mentioned to finish the query?
> Just want to know the typical use case for the Hybrid shuffle mode :)
>
>
> Best,
> Aitozi.
>
> weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:
>
> > Yangze, Thank you for the feedback!
> > Here's my thoughts for your questions:
> >
> > >>> How do we decide the size of the buffer pool in MemoryDataManager and
> > the read buffers in FileDataManager?
> > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > ResultPartition, and the size is the same as the current implementation
> of
> > sort-merge shuffle. In other words, the minimum value of BufferPool is a
> > configurable fixed value, and the maximum value is Math.max(min, 4 *
> > numSubpartitions). The default value can be determined by running the
> > TPC-DS tests.
> > Read buffers in FileDataManager are requested from the
> > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
> > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> > value is 32M, which is consistent with the current sort-merge shuffle
> > logic.
> >
> > >>> Is there an upper limit for the sum of them? If there is, how does
> > MemoryDataManager and FileDataManager sync the memory usage?
> > The buffers of the MemoryDataManager are limited by the size of the
> > LocalBufferPool, and the upper limit is the size of the Network Memory.
> The
> > buffers of the FileDataManager are directly requested from
> > UnpooledOffHeapMemory, and are also limited by the size of the framework
> > off-heap memory. I think there should be no need for additional
> > synchronization mechanisms.
> >
> > >>> How do you disable the slot sharing? If user configures both the slot
> > sharing group and hybrid shuffle, what will happen to that job?
> > I think we can print a warning log when Hybrid Shuffle is enabled and SSG
> > is configured during the JobGraph compilation stage, and fallback to the
> > region slot sharing group by default. Of course, it will be emphasized in
> > the document that we do not currently support SSG, If configured, it will
> > fall back to the default.
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
> >
> > > Thanks for driving this. Xintong and Weijie.
> > >
> > > I believe this feature will make Flink a better batch/OLAP engine. +1
> > > for the overall design.
> > >
> > > Some questions:
> > > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > > and the read buffers in FileDataManager?
> > > 2. Is there an upper limit for the sum of them? If there is, how does
> > > MemoryDataManager and FileDataManager sync the memory usage?
> > > 3. How do you disable the slot sharing? If user configures both the
> > > slot sharing group and hybrid shuffle, what will happen to that job?
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 19, 2022 at 2:41 PM Xintong Song <to...@gmail.com>
> > > wrote:
> > > >
> > > > Thanks for preparing this FLIP, Weijie.
> > > >
> > > > I think this is a good improvement on batch resource elasticity.
> > Looking
> > > > forward to the community feedback.
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Thu, May 19, 2022 at 2:31 PM weijie guo <
> guoweijiereswqa@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > >
> > > > > I’d like to start a discussion about FLIP-235[1], which introduce a
> > > new shuffle mode
> > > > >  can overcome some of the problems of Pipelined Shuffle and
> Blocking
> > > Shuffle in batch scenarios.
> > > > >
> > > > >
> > > > > Currently in Flink, task scheduling is more or less constrained by
> > the
> > > shuffle implementations.
> > > > > This will bring the following disadvantages:
> > > > >
> > > > >    1. Pipelined Shuffle:
> > > > >     For pipelined shuffle, the upstream and downstream tasks are
> > > required to be deployed at the same time, to avoid upstream tasks being
> > > blocked forever. This is fine when there are enough resources for both
> > > upstream and downstream tasks to run simultaneously, but will cause the
> > > following problems otherwise:
> > > > >    1.
> > > > >       Pipelined shuffle connected tasks (i.e., a pipelined region)
> > > cannot be executed until obtaining resources for all of them, resulting
> > in
> > > longer job finishing time and poorer resource efficiency due to holding
> > > part of the resources idle while waiting for the rest.
> > > > >       2.
> > > > >       More severely, if multiple jobs each hold part of the cluster
> > > resources and are waiting for more, a deadlock would occur. The chance
> is
> > > not trivial, especially for scenarios such as OLAP where concurrent job
> > > submissions are frequent.
> > > > >       2. Blocking Shuffle:
> > > > >     For blocking shuffle, execution of downstream tasks must wait
> for
> > > all upstream tasks to finish, despite there might be more resources
> > > available. The sequential execution of upstream and downstream tasks
> > > significantly increase the job finishing time, and the disk IO workload
> > for
> > > spilling and loading full intermediate data also affects the
> performance.
> > > > >
> > > > >
> > > > > We believe the root cause of the above problems is that shuffle
> > > implementations put unnecessary constraints on task scheduling.
> > > > >
> > > > >
> > > > > To solve this problem, Xintong Song and I propose to introduce
> hybrid
> > > shuffle to minimize the scheduling constraints. With Hybrid Shuffle,
> > Flink
> > > should:
> > > > >
> > > > >    1. Make best use of available resources.
> > > > >     Ideally, we want Flink to always make progress if possible.
> That
> > > is to say, it should always execute a pending task if there are
> resources
> > > available for that task.
> > > > >    2. Minimize disk IO load.
> > > > >     In-flight data should be consumed directly from memory as much
> as
> > > possible. Only data that is not consumed timely should be spilled to
> > disk.
> > > > >
> > > > > You can find more details in FLIP-235. Looking forward to your
> > > feedback.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > >
> > > > >
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > >
> >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Aitozi <gj...@gmail.com>.
Hi Weijie:

     Thanks for the nice FLIP, I have couple questions about this:

1) In the hybrid shuffle mode, the shuffle mode is decided by the resource.
If there
is an unbounded data source, but only have resource to schedule the first
stage, will it
bring the big burden to the disk/shuffle service which will occupy all the
resource I think.

2) Which kind of job will benefit from the hybrid shuffle mode. In other
words, In which
case we can use the hybrid shuffle mode:
- For batch job want to use more resource to reduce the e2e time ?
- Or for streaming job which may lack of resource temporarily ?
- Or for OLAP job which will try to make best use of available resources as
you mentioned to finish the query?
Just want to know the typical use case for the Hybrid shuffle mode :)


Best,
Aitozi.

weijie guo <gu...@gmail.com> 于2022年5月19日周四 18:33写道:

> Yangze, Thank you for the feedback!
> Here's my thoughts for your questions:
>
> >>> How do we decide the size of the buffer pool in MemoryDataManager and
> the read buffers in FileDataManager?
> The BufferPool in MemoryDataManager is the LocalBufferPool used by
> ResultPartition, and the size is the same as the current implementation of
> sort-merge shuffle. In other words, the minimum value of BufferPool is a
> configurable fixed value, and the maximum value is Math.max(min, 4 *
> numSubpartitions). The default value can be determined by running the
> TPC-DS tests.
> Read buffers in FileDataManager are requested from the
> BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> value is 32M, which is consistent with the current sort-merge shuffle
> logic.
>
> >>> Is there an upper limit for the sum of them? If there is, how does
> MemoryDataManager and FileDataManager sync the memory usage?
> The buffers of the MemoryDataManager are limited by the size of the
> LocalBufferPool, and the upper limit is the size of the Network Memory. The
> buffers of the FileDataManager are directly requested from
> UnpooledOffHeapMemory, and are also limited by the size of the framework
> off-heap memory. I think there should be no need for additional
> synchronization mechanisms.
>
> >>> How do you disable the slot sharing? If user configures both the slot
> sharing group and hybrid shuffle, what will happen to that job?
> I think we can print a warning log when Hybrid Shuffle is enabled and SSG
> is configured during the JobGraph compilation stage, and fallback to the
> region slot sharing group by default. Of course, it will be emphasized in
> the document that we do not currently support SSG, If configured, it will
> fall back to the default.
>
>
> Best regards,
>
> Weijie
>
>
> Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:
>
> > Thanks for driving this. Xintong and Weijie.
> >
> > I believe this feature will make Flink a better batch/OLAP engine. +1
> > for the overall design.
> >
> > Some questions:
> > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > and the read buffers in FileDataManager?
> > 2. Is there an upper limit for the sum of them? If there is, how does
> > MemoryDataManager and FileDataManager sync the memory usage?
> > 3. How do you disable the slot sharing? If user configures both the
> > slot sharing group and hybrid shuffle, what will happen to that job?
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, May 19, 2022 at 2:41 PM Xintong Song <to...@gmail.com>
> > wrote:
> > >
> > > Thanks for preparing this FLIP, Weijie.
> > >
> > > I think this is a good improvement on batch resource elasticity.
> Looking
> > > forward to the community feedback.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, May 19, 2022 at 2:31 PM weijie guo <gu...@gmail.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > >
> > > > I’d like to start a discussion about FLIP-235[1], which introduce a
> > new shuffle mode
> > > >  can overcome some of the problems of Pipelined Shuffle and Blocking
> > Shuffle in batch scenarios.
> > > >
> > > >
> > > > Currently in Flink, task scheduling is more or less constrained by
> the
> > shuffle implementations.
> > > > This will bring the following disadvantages:
> > > >
> > > >    1. Pipelined Shuffle:
> > > >     For pipelined shuffle, the upstream and downstream tasks are
> > required to be deployed at the same time, to avoid upstream tasks being
> > blocked forever. This is fine when there are enough resources for both
> > upstream and downstream tasks to run simultaneously, but will cause the
> > following problems otherwise:
> > > >    1.
> > > >       Pipelined shuffle connected tasks (i.e., a pipelined region)
> > cannot be executed until obtaining resources for all of them, resulting
> in
> > longer job finishing time and poorer resource efficiency due to holding
> > part of the resources idle while waiting for the rest.
> > > >       2.
> > > >       More severely, if multiple jobs each hold part of the cluster
> > resources and are waiting for more, a deadlock would occur. The chance is
> > not trivial, especially for scenarios such as OLAP where concurrent job
> > submissions are frequent.
> > > >       2. Blocking Shuffle:
> > > >     For blocking shuffle, execution of downstream tasks must wait for
> > all upstream tasks to finish, despite there might be more resources
> > available. The sequential execution of upstream and downstream tasks
> > significantly increase the job finishing time, and the disk IO workload
> for
> > spilling and loading full intermediate data also affects the performance.
> > > >
> > > >
> > > > We believe the root cause of the above problems is that shuffle
> > implementations put unnecessary constraints on task scheduling.
> > > >
> > > >
> > > > To solve this problem, Xintong Song and I propose to introduce hybrid
> > shuffle to minimize the scheduling constraints. With Hybrid Shuffle,
> Flink
> > should:
> > > >
> > > >    1. Make best use of available resources.
> > > >     Ideally, we want Flink to always make progress if possible. That
> > is to say, it should always execute a pending task if there are resources
> > available for that task.
> > > >    2. Minimize disk IO load.
> > > >     In-flight data should be consumed directly from memory as much as
> > possible. Only data that is not consumed timely should be spilled to
> disk.
> > > >
> > > > You can find more details in FLIP-235. Looking forward to your
> > feedback.
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > >
> > > >
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by weijie guo <gu...@gmail.com>.
Yangze, Thank you for the feedback!
Here's my thoughts for your questions:

>>> How do we decide the size of the buffer pool in MemoryDataManager and
the read buffers in FileDataManager?
The BufferPool in MemoryDataManager is the LocalBufferPool used by
ResultPartition, and the size is the same as the current implementation of
sort-merge shuffle. In other words, the minimum value of BufferPool is a
configurable fixed value, and the maximum value is Math.max(min, 4 *
numSubpartitions). The default value can be determined by running the
TPC-DS tests.
Read buffers in FileDataManager are requested from the
BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
*taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
value is 32M, which is consistent with the current sort-merge shuffle logic.

>>> Is there an upper limit for the sum of them? If there is, how does
MemoryDataManager and FileDataManager sync the memory usage?
The buffers of the MemoryDataManager are limited by the size of the
LocalBufferPool, and the upper limit is the size of the Network Memory. The
buffers of the FileDataManager are directly requested from
UnpooledOffHeapMemory, and are also limited by the size of the framework
off-heap memory. I think there should be no need for additional
synchronization mechanisms.

>>> How do you disable the slot sharing? If user configures both the slot
sharing group and hybrid shuffle, what will happen to that job?
I think we can print a warning log when Hybrid Shuffle is enabled and SSG
is configured during the JobGraph compilation stage, and fallback to the
region slot sharing group by default. Of course, it will be emphasized in
the document that we do not currently support SSG, If configured, it will
fall back to the default.


Best regards,

Weijie


Yangze Guo <ka...@gmail.com> 于2022年5月19日周四 16:25写道:

> Thanks for driving this. Xintong and Weijie.
>
> I believe this feature will make Flink a better batch/OLAP engine. +1
> for the overall design.
>
> Some questions:
> 1. How do we decide the size of the buffer pool in MemoryDataManager
> and the read buffers in FileDataManager?
> 2. Is there an upper limit for the sum of them? If there is, how does
> MemoryDataManager and FileDataManager sync the memory usage?
> 3. How do you disable the slot sharing? If user configures both the
> slot sharing group and hybrid shuffle, what will happen to that job?
>
>
> Best,
> Yangze Guo
>
> On Thu, May 19, 2022 at 2:41 PM Xintong Song <to...@gmail.com>
> wrote:
> >
> > Thanks for preparing this FLIP, Weijie.
> >
> > I think this is a good improvement on batch resource elasticity. Looking
> > forward to the community feedback.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, May 19, 2022 at 2:31 PM weijie guo <gu...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > >
> > > I’d like to start a discussion about FLIP-235[1], which introduce a
> new shuffle mode
> > >  can overcome some of the problems of Pipelined Shuffle and Blocking
> Shuffle in batch scenarios.
> > >
> > >
> > > Currently in Flink, task scheduling is more or less constrained by the
> shuffle implementations.
> > > This will bring the following disadvantages:
> > >
> > >    1. Pipelined Shuffle:
> > >     For pipelined shuffle, the upstream and downstream tasks are
> required to be deployed at the same time, to avoid upstream tasks being
> blocked forever. This is fine when there are enough resources for both
> upstream and downstream tasks to run simultaneously, but will cause the
> following problems otherwise:
> > >    1.
> > >       Pipelined shuffle connected tasks (i.e., a pipelined region)
> cannot be executed until obtaining resources for all of them, resulting in
> longer job finishing time and poorer resource efficiency due to holding
> part of the resources idle while waiting for the rest.
> > >       2.
> > >       More severely, if multiple jobs each hold part of the cluster
> resources and are waiting for more, a deadlock would occur. The chance is
> not trivial, especially for scenarios such as OLAP where concurrent job
> submissions are frequent.
> > >       2. Blocking Shuffle:
> > >     For blocking shuffle, execution of downstream tasks must wait for
> all upstream tasks to finish, despite there might be more resources
> available. The sequential execution of upstream and downstream tasks
> significantly increase the job finishing time, and the disk IO workload for
> spilling and loading full intermediate data also affects the performance.
> > >
> > >
> > > We believe the root cause of the above problems is that shuffle
> implementations put unnecessary constraints on task scheduling.
> > >
> > >
> > > To solve this problem, Xintong Song and I propose to introduce hybrid
> shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink
> should:
> > >
> > >    1. Make best use of available resources.
> > >     Ideally, we want Flink to always make progress if possible. That
> is to say, it should always execute a pending task if there are resources
> available for that task.
> > >    2. Minimize disk IO load.
> > >     In-flight data should be consumed directly from memory as much as
> possible. Only data that is not consumed timely should be spilled to disk.
> > >
> > > You can find more details in FLIP-235. Looking forward to your
> feedback.
> > >
> > >
> > > [1]
> > >
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > >
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
>

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Yangze Guo <ka...@gmail.com>.
Thanks for driving this. Xintong and Weijie.

I believe this feature will make Flink a better batch/OLAP engine. +1
for the overall design.

Some questions:
1. How do we decide the size of the buffer pool in MemoryDataManager
and the read buffers in FileDataManager?
2. Is there an upper limit for the sum of them? If there is, how does
MemoryDataManager and FileDataManager sync the memory usage?
3. How do you disable the slot sharing? If user configures both the
slot sharing group and hybrid shuffle, what will happen to that job?


Best,
Yangze Guo

On Thu, May 19, 2022 at 2:41 PM Xintong Song <to...@gmail.com> wrote:
>
> Thanks for preparing this FLIP, Weijie.
>
> I think this is a good improvement on batch resource elasticity. Looking
> forward to the community feedback.
>
> Best,
>
> Xintong
>
>
>
> On Thu, May 19, 2022 at 2:31 PM weijie guo <gu...@gmail.com>
> wrote:
>
> > Hi all,
> >
> >
> > I’d like to start a discussion about FLIP-235[1], which introduce a new shuffle mode
> >  can overcome some of the problems of Pipelined Shuffle and Blocking Shuffle in batch scenarios.
> >
> >
> > Currently in Flink, task scheduling is more or less constrained by the shuffle implementations.
> > This will bring the following disadvantages:
> >
> >    1. Pipelined Shuffle:
> >     For pipelined shuffle, the upstream and downstream tasks are required to be deployed at the same time, to avoid upstream tasks being blocked forever. This is fine when there are enough resources for both upstream and downstream tasks to run simultaneously, but will cause the following problems otherwise:
> >    1.
> >       Pipelined shuffle connected tasks (i.e., a pipelined region) cannot be executed until obtaining resources for all of them, resulting in longer job finishing time and poorer resource efficiency due to holding part of the resources idle while waiting for the rest.
> >       2.
> >       More severely, if multiple jobs each hold part of the cluster resources and are waiting for more, a deadlock would occur. The chance is not trivial, especially for scenarios such as OLAP where concurrent job submissions are frequent.
> >       2. Blocking Shuffle:
> >     For blocking shuffle, execution of downstream tasks must wait for all upstream tasks to finish, despite there might be more resources available. The sequential execution of upstream and downstream tasks significantly increase the job finishing time, and the disk IO workload for spilling and loading full intermediate data also affects the performance.
> >
> >
> > We believe the root cause of the above problems is that shuffle implementations put unnecessary constraints on task scheduling.
> >
> >
> > To solve this problem, Xintong Song and I propose to introduce hybrid shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink should:
> >
> >    1. Make best use of available resources.
> >     Ideally, we want Flink to always make progress if possible. That is to say, it should always execute a pending task if there are resources available for that task.
> >    2. Minimize disk IO load.
> >     In-flight data should be consumed directly from memory as much as possible. Only data that is not consumed timely should be spilled to disk.
> >
> > You can find more details in FLIP-235. Looking forward to your feedback.
> >
> >
> > [1]
> >
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

Posted by Xintong Song <to...@gmail.com>.
Thanks for preparing this FLIP, Weijie.

I think this is a good improvement on batch resource elasticity. Looking
forward to the community feedback.

Best,

Xintong



On Thu, May 19, 2022 at 2:31 PM weijie guo <gu...@gmail.com>
wrote:

> Hi all,
>
>
> I’d like to start a discussion about FLIP-235[1], which introduce a new shuffle mode
>  can overcome some of the problems of Pipelined Shuffle and Blocking Shuffle in batch scenarios.
>
>
> Currently in Flink, task scheduling is more or less constrained by the shuffle implementations.
> This will bring the following disadvantages:
>
>    1. Pipelined Shuffle:
>     For pipelined shuffle, the upstream and downstream tasks are required to be deployed at the same time, to avoid upstream tasks being blocked forever. This is fine when there are enough resources for both upstream and downstream tasks to run simultaneously, but will cause the following problems otherwise:
>    1.
>       Pipelined shuffle connected tasks (i.e., a pipelined region) cannot be executed until obtaining resources for all of them, resulting in longer job finishing time and poorer resource efficiency due to holding part of the resources idle while waiting for the rest.
>       2.
>       More severely, if multiple jobs each hold part of the cluster resources and are waiting for more, a deadlock would occur. The chance is not trivial, especially for scenarios such as OLAP where concurrent job submissions are frequent.
>       2. Blocking Shuffle:
>     For blocking shuffle, execution of downstream tasks must wait for all upstream tasks to finish, despite there might be more resources available. The sequential execution of upstream and downstream tasks significantly increase the job finishing time, and the disk IO workload for spilling and loading full intermediate data also affects the performance.
>
>
> We believe the root cause of the above problems is that shuffle implementations put unnecessary constraints on task scheduling.
>
>
> To solve this problem, Xintong Song and I propose to introduce hybrid shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink should:
>
>    1. Make best use of available resources.
>     Ideally, we want Flink to always make progress if possible. That is to say, it should always execute a pending task if there are resources available for that task.
>    2. Minimize disk IO load.
>     In-flight data should be consumed directly from memory as much as possible. Only data that is not consumed timely should be spilled to disk.
>
> You can find more details in FLIP-235. Looking forward to your feedback.
>
>
> [1]
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>
>
>
> Best regards,
>
> Weijie
>