You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yingjie Cao <ke...@gmail.com> on 2022/01/04 09:34:27 UTC

Re: [DISCUSS] Change some default config values of blocking shuffle

Hi all,

After running some tests with the proposed default value (
taskmanager.network.sort-shuffle.min-parallelism: 1,
taskmanager.network.sort-shuffle.min-buffers: 512,
taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
share some test results.

1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
default parallelism and several different settings multiple times):
1) Stability:
Compared to the current default values, the proposed default values can
improve the TPC-DS query stability a lot. With the current default, there
are many queries suffering from blocking shuffle relevant failures. With
the proposed default values, only three queries fail because of the
"Insufficient number of network buffers:" error. With 512 parallelism, the
current default configuration will incur the same issue. Part of the reason
is that the network buffer consumed by InputGate is  proportional to
parallelism and can use 32M network memory by default and many tasks has
several InputGate but we only has 128M network memory per TaskManager by
default.
2) Performance:
Compared to the current default values, the proposed default values can
improve the TPC-DS query performance a lot. Except for those queries of
small shuffle data amount which consume really short time, the proposed
default values can bring 2-10 times performance gain. About the default
value of taskmanager.network.sort-shuffle.min-parallelism  proposed by Yun,
I tested both 1 and 128 and 1 is better for performance which is as
expected.

2. Flink pre-commit stability test:
I have run all Flink tests with the proposed default value for more than 20
times. The only instability is the "Insufficient number of network
buffers:" error for batch several test cases. This error occurs because
some tests have really limited network buffers and the proposed default
config values may increase the network buffer consumption for cases. After
increase the total network size for these test cases, the issue can be
solved.

Summary:
1. The proposed default value can improve both the performance and
stability of Flink batch shuffle a lot.
2. Some batch jobs may fail because of the "Insufficient number of network
buffers:" error for this default value change will increase the network
buffer consumption a little for jobs less than 512 parallelism (for jobs
more than 512 parallelism network buffer consumption will be reduced).
3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has better
performance than setting that to 128, both settings may incur the
"Insufficient number of network buffers:" error.
4. After changing the default value and fixing several test cases, all
Flink tests (except for those known unstable cases) can run stably.

Personally, I am +1 to make the change. Though the change may cause some
batch jobs fail because of the "Insufficient number of network buffers:",
the possibility is small enough (only 3 TPC-DS out of about 100 queries
fails, these queries will also fail with the current default configuration
because it is the InputGate which takes the most network buffers and cost
the error). Compared to this small regression, the performance and
stability gains are big. Any feedbacks especially those from Flink batch
users are highly appreciated.

BTW, aside from the above tests, I also tries to tune some more config
options to try to make the TPC-DS test faster. I copied these tuned config
options from our daily TPC-DS settings. The results show that the optimized
configuration can improve the TPC-DS performance about 30%. Though these
settings may not the best, they really help compared to the default value.
I attached some settings in this may, I guess some Flink batch users may be
interested in this. Based on my limited knowledge, I guess that increasing
the total TaskManager size and network memory size is important for
performance, because more memory (managed and network) can make operators
and shuffle faster.

Best,
Yingjie



Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:

> Hi Till,
>
> Thanks for the suggestion. I think it makes a lot of sense to also extend
> the documentation for the sort shuffle to include a tuning guide.
>
> Best,
> Yingjie
>
> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
>
>> As part of this FLIP, does it make sense to also extend the documentation
>> for the sort shuffle [1] to include a tuning guide? I am thinking of a more
>> in depth description of what things you might observe and how to influence
>> them with the configuration options.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>
>> Cheers,
>> Till
>>
>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Hi Yingjie,
>>>
>>> Thanks for your explanation. I have no more questions. +1
>>>
>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
>>> wrote:
>>> >
>>> > Hi Jingsong,
>>> >
>>> > Thanks for your feedback.
>>> >
>>> > >>> My question is, what is the maximum parallelism a job can have
>>> with the default configuration? (Does this break out of the box)
>>> >
>>> > Yes, you are right, these two options are related to network memory
>>> and framework off-heap memory. Generally, these changes will not break out
>>> of the box experience, but for some extreme cases, for example, there are
>>> too many ResultPartitions per task, users may need to increase network
>>> memory to avoid "insufficient network buffer" error. For framework
>>> off-head, I believe that user do not need to change the default value.
>>> >
>>> > In fact, I have a basic goal when changing these config values: when
>>> running TPCDS of medium parallelism with the default value, all queries
>>> must pass without any error and at the same time, the performance can be
>>> improved. I think if we achieve this goal, most common use cases can be
>>> covered.
>>> >
>>> > Currently, for the default configuration, the exclusive buffers
>>> required at input gate side is still parallelism relevant (though since
>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>> setting a config value, it has slight performance influence on streaming
>>> jobs), which means that no large parallelism can be supported by the
>>> default configuration. Roughly, I would say the default value can support
>>> jobs of several hundreds of parallelism.
>>> >
>>> > >>> I do feel that this correspondence is a bit difficult to control
>>> at the moment, and it would be best if a rough table could be provided.
>>> >
>>> > I think this is a good suggestion, we can provide those suggestions in
>>> the document.
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
>>> >>
>>> >> Hi  Yingjie,
>>> >>
>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>> >> of batch jobs.
>>> >>
>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>> >> network memory and framework.off-heap.size.
>>> >>
>>> >> My question is, what is the maximum parallelism a job can have with
>>> >> the default configuration? (Does this break out of the box)
>>> >>
>>> >> How much network memory and framework.off-heap.size are required for
>>> >> how much parallelism in the default configuration?
>>> >>
>>> >> I do feel that this correspondence is a bit difficult to control at
>>> >> the moment, and it would be best if a rough table could be provided.
>>> >>
>>> >> Best,
>>> >> Jingsong
>>> >>
>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <ke...@gmail.com>
>>> wrote:
>>> >> >
>>> >> > Hi Jiangang,
>>> >> >
>>> >> > Thanks for your suggestion.
>>> >> >
>>> >> > >>> The config can affect the memory usage. Will the related memory
>>> configs be changed?
>>> >> >
>>> >> > I think we will not change the default network memory settings. My
>>> best expectation is that the default value can work for most cases (though
>>> may not the best) and for other cases, user may need to tune the memory
>>> settings.
>>> >> >
>>> >> > >>> Can you share the tpcds results for different configs? Although
>>> we change the default values, it is helpful to change them for different
>>> users. In this case, the experience can help a lot.
>>> >> >
>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>> may because that all shuffle data can be cached in memory (page cache),
>>> this is the case if the cluster have enough resources. However, if the
>>> whole cluster is under heavy burden or you are running large scale jobs,
>>> the performance of those small jobs can also be influenced. For large-scale
>>> jobs, the configurations suggested to be tuned are
>>> taskmanager.network.sort-shuffle.min-buffers and
>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>> these values for large-scale batch jobs.
>>> >> >
>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>> these results soon.
>>> >> >
>>> >> > Best,
>>> >> > Yingjie
>>> >> >
>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
>>> >> >>
>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>> with the changing configs can not improve the performance much just as your
>>> test. I have some suggestions:
>>> >> >>
>>> >> >> The config can affect the memory usage. Will the related memory
>>> configs be changed?
>>> >> >> Can you share the tpcds results for different configs? Although we
>>> change the default values, it is helpful to change them for different
>>> users. In this case, the experience can help a lot.
>>> >> >>
>>> >> >> Best,
>>> >> >> Liu Jiangang
>>> >> >>
>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>> >> >>>
>>> >> >>> Hi Yingjie,
>>> >> >>>
>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>> >> >>>
>>> >> >>> May I have a double confirmation for
>>> taskmanager.network.sort-shuffle.min-parallelism that
>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>> for all the cases, does our
>>> >> >>> current circumstance still have difference with them?
>>> >> >>>
>>> >> >>> Best,
>>> >> >>> Yun
>>> >> >>>
>>> >> >>>
>>> >> >>>
>>> >> >>>
>>> >> >>> ------------------------------------------------------------------
>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
>>> user-zh <us...@flink.apache.org>
>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>> blocking shuffle
>>> >> >>>
>>> >> >>> Hi dev & users:
>>> >> >>>
>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>> appreciated.
>>> >> >>>
>>> >> >>> Best,
>>> >> >>> Yingjie
>>> >> >>>
>>> >> >>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
>>> >> >>>
>>> >> >>> Hi dev & users,
>>> >> >>>
>>> >> >>> We propose to change some default values of blocking shuffle to
>>> improve the user out-of-box experience (not influence streaming). The
>>> default values we want to change are as follows:
>>> >> >>>
>>> >> >>> 1. Data compression
>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>> default value is 'false'.  Usually, data compression can reduce both disk
>>> and network IO which is good for performance. At the same time, it can save
>>> storage space. We propose to change the default value to true.
>>> >> >>>
>>> >> >>> 2. Default shuffle implementation
>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>> both stability and performance. So we propose to reduce the default value
>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>> 1024 with a tpc-ds and 128 is the best one.)
>>> >> >>>
>>> >> >>> 3. Read buffer of sort-shuffle
>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>> default value is '32M'. Previously, when choosing the default value, both
>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>> way. However, recently, it is reported in the mailing list that the default
>>> value is not enough which caused a buffer request timeout issue. We already
>>> created a ticket to improve the behavior. At the same time, we propose to
>>> increase this default value to '64M' which can also help.
>>> >> >>>
>>> >> >>> 4. Sort buffer size of sort-shuffle
>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>> This default value is quite modest and the performance can be influenced.
>>> We propose to increase this value to a larger one, for example, 512 (the
>>> default TM and network buffer configuration can serve more than 10 result
>>> partitions concurrently).
>>> >> >>>
>>> >> >>> We already tested these default values together with tpc-ds
>>> benchmark in a cluster and both the performance and stability improved a
>>> lot. These changes can help to improve the out-of-box experience of
>>> blocking shuffle. What do you think about these changes? Is there any
>>> concern? If there are no objections, I will make these changes soon.
>>> >> >>>
>>> >> >>> Best,
>>> >> >>> Yingjie
>>> >> >>>
>>> >>
>>> >>
>>> >> --
>>> >> Best, Jingsong Lee
>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by Yingjie Cao <ke...@gmail.com>.
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao <yu...@aliyun.com> 于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> ------------------------------------------------------------------
> From:刘建刚 <li...@gmail.com>
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh <us...@flink.apache.org>
> Cc:dev <de...@flink.apache.org>; user <us...@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stability gains are big. Any feedbacks especially those from Flink batch
> > users are highly appreciated.
> >
> > BTW, aside from the above tests, I also tries to tune some more config
> > options to try to make the TPC-DS test faster. I copied these tuned
> config
> > options from our daily TPC-DS settings. The results show that the
> optimized
> > configuration can improve the TPC-DS performance about 30%. Though these
> > settings may not the best, they really help compared to the default
> value.
> > I attached some settings in this may, I guess some Flink batch users may
> be
> > interested in this. Based on my limited knowledge, I guess that
> increasing
> > the total TaskManager size and network memory size is important for
> > performance, because more memory (managed and network) can make operators
> > and shuffle faster.
> >
> > Best,
> > Yingjie
> >
> >
> >
> > Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
> >
> >> Hi Till,
> >>
> >> Thanks for the suggestion. I think it makes a lot of sense to also
> extend
> >> the documentation for the sort shuffle to include a tuning guide.
> >>
> >> Best,
> >> Yingjie
> >>
> >> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
> >>
> >>> As part of this FLIP, does it make sense to also extend the
> >>> documentation for the sort shuffle [1] to include a tuning guide? I am
> >>> thinking of a more in depth description of what things you might
> observe
> >>> and how to influence them with the configuration options.
> >>>
> >>> [1]
> >>>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Yingjie,
> >>>>
> >>>> Thanks for your explanation. I have no more questions. +1
> >>>>
> >>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
> >>>> wrote:
> >>>> >
> >>>> > Hi Jingsong,
> >>>> >
> >>>> > Thanks for your feedback.
> >>>> >
> >>>> > >>> My question is, what is the maximum parallelism a job can have
> >>>> with the default configuration? (Does this break out of the box)
> >>>> >
> >>>> > Yes, you are right, these two options are related to network memory
> >>>> and framework off-heap memory. Generally, these changes will not
> break out
> >>>> of the box experience, but for some extreme cases, for example, there
> are
> >>>> too many ResultPartitions per task, users may need to increase network
> >>>> memory to avoid "insufficient network buffer" error. For framework
> >>>> off-head, I believe that user do not need to change the default value.
> >>>> >
> >>>> > In fact, I have a basic goal when changing these config values: when
> >>>> running TPCDS of medium parallelism with the default value, all
> queries
> >>>> must pass without any error and at the same time, the performance can
> be
> >>>> improved. I think if we achieve this goal, most common use cases can
> be
> >>>> covered.
> >>>> >
> >>>> > Currently, for the default configuration, the exclusive buffers
> >>>> required at input gate side is still parallelism relevant (though
> since
> >>>> 1.14, we can decouple the network buffer consumption from parallelism
> by
> >>>> setting a config value, it has slight performance influence on
> streaming
> >>>> jobs), which means that no large parallelism can be supported by the
> >>>> default configuration. Roughly, I would say the default value can
> support
> >>>> jobs of several hundreds of parallelism.
> >>>> >
> >>>> > >>> I do feel that this correspondence is a bit difficult to control
> >>>> at the moment, and it would be best if a rough table could be
> provided.
> >>>> >
> >>>> > I think this is a good suggestion, we can provide those suggestions
> >>>> in the document.
> >>>> >
> >>>> > Best,
> >>>> > Yingjie
> >>>> >
> >>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
> >>>> >>
> >>>> >> Hi  Yingjie,
> >>>> >>
> >>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the
> ease
> >>>> >> of batch jobs.
> >>>> >>
> >>>> >> Looks like
> "taskmanager.memory.framework.off-heap.batch-shuffle.size"
> >>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
> >>>> >> network memory and framework.off-heap.size.
> >>>> >>
> >>>> >> My question is, what is the maximum parallelism a job can have with
> >>>> >> the default configuration? (Does this break out of the box)
> >>>> >>
> >>>> >> How much network memory and framework.off-heap.size are required
> for
> >>>> >> how much parallelism in the default configuration?
> >>>> >>
> >>>> >> I do feel that this correspondence is a bit difficult to control at
> >>>> >> the moment, and it would be best if a rough table could be
> provided.
> >>>> >>
> >>>> >> Best,
> >>>> >> Jingsong
> >>>> >>
> >>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <
> kevin.yingjie@gmail.com>
> >>>> wrote:
> >>>> >> >
> >>>> >> > Hi Jiangang,
> >>>> >> >
> >>>> >> > Thanks for your suggestion.
> >>>> >> >
> >>>> >> > >>> The config can affect the memory usage. Will the related
> >>>> memory configs be changed?
> >>>> >> >
> >>>> >> > I think we will not change the default network memory settings.
> My
> >>>> best expectation is that the default value can work for most cases
> (though
> >>>> may not the best) and for other cases, user may need to tune the
> memory
> >>>> settings.
> >>>> >> >
> >>>> >> > >>> Can you share the tpcds results for different configs?
> >>>> Although we change the default values, it is helpful to change them
> for
> >>>> different users. In this case, the experience can help a lot.
> >>>> >> >
> >>>> >> > I did not keep all previous TPCDS results, but from the results,
> I
> >>>> can tell that on HDD, always using the sort-shuffle is a good choice.
> For
> >>>> small jobs, using sort-shuffle may not bring much performance gain,
> this
> >>>> may because that all shuffle data can be cached in memory (page
> cache),
> >>>> this is the case if the cluster have enough resources. However, if the
> >>>> whole cluster is under heavy burden or you are running large scale
> jobs,
> >>>> the performance of those small jobs can also be influenced. For
> large-scale
> >>>> jobs, the configurations suggested to be tuned are
> >>>> taskmanager.network.sort-shuffle.min-buffers and
> >>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can
> increase
> >>>> these values for large-scale batch jobs.
> >>>> >> >
> >>>> >> > BTW, I am still running TPCDS tests these days and I can share
> >>>> these results soon.
> >>>> >> >
> >>>> >> > Best,
> >>>> >> > Yingjie
> >>>> >> >
> >>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
> >>>> >> >>
> >>>> >> >> Glad to see the suggestion. In our test, we found that small
> jobs
> >>>> with the changing configs can not improve the performance much just
> as your
> >>>> test. I have some suggestions:
> >>>> >> >>
> >>>> >> >> The config can affect the memory usage. Will the related memory
> >>>> configs be changed?
> >>>> >> >> Can you share the tpcds results for different configs? Although
> >>>> we change the default values, it is helpful to change them for
> different
> >>>> users. In this case, the experience can help a lot.
> >>>> >> >>
> >>>> >> >> Best,
> >>>> >> >> Liu Jiangang
> >>>> >> >>
> >>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
> >>>> >> >>>
> >>>> >> >>> Hi Yingjie,
> >>>> >> >>>
> >>>> >> >>> Very thanks for drafting the FLIP and initiating the
> discussion!
> >>>> >> >>>
> >>>> >> >>> May I have a double confirmation for
> >>>> taskmanager.network.sort-shuffle.min-parallelism that
> >>>> >> >>> since other frameworks like Spark have used sort-based shuffle
> >>>> for all the cases, does our
> >>>> >> >>> current circumstance still have difference with them?
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yun
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> ------------------------------------------------------------------
> >>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
> >>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
> >>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
> >>>> user-zh <us...@flink.apache.org>
> >>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
> >>>> blocking shuffle
> >>>> >> >>>
> >>>> >> >>> Hi dev & users:
> >>>> >> >>>
> >>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
> >>>> appreciated.
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yingjie
> >>>> >> >>>
> >>>> >> >>> [1]
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
> >>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
> >>>> >> >>>
> >>>> >> >>> Hi dev & users,
> >>>> >> >>>
> >>>> >> >>> We propose to change some default values of blocking shuffle to
> >>>> improve the user out-of-box experience (not influence streaming). The
> >>>> default values we want to change are as follows:
> >>>> >> >>>
> >>>> >> >>> 1. Data compression
> >>>> (taskmanager.network.blocking-shuffle.compression.enabled):
> Currently, the
> >>>> default value is 'false'.  Usually, data compression can reduce both
> disk
> >>>> and network IO which is good for performance. At the same time, it
> can save
> >>>> storage space. We propose to change the default value to true.
> >>>> >> >>>
> >>>> >> >>> 2. Default shuffle implementation
> >>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the
> default
> >>>> value is 'Integer.MAX', which means by default, Flink jobs will
> always use
> >>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better
> for
> >>>> both stability and performance. So we propose to reduce the default
> value
> >>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512
> and
> >>>> 1024 with a tpc-ds and 128 is the best one.)
> >>>> >> >>>
> >>>> >> >>> 3. Read buffer of sort-shuffle
> >>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size):
> Currently, the
> >>>> default value is '32M'. Previously, when choosing the default value,
> both
> >>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a
> cautious
> >>>> way. However, recently, it is reported in the mailing list that the
> default
> >>>> value is not enough which caused a buffer request timeout issue. We
> already
> >>>> created a ticket to improve the behavior. At the same time, we
> propose to
> >>>> increase this default value to '64M' which can also help.
> >>>> >> >>>
> >>>> >> >>> 4. Sort buffer size of sort-shuffle
> >>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> >>>> value is '64' which means '64' network buffers (32k per buffer by
> default).
> >>>> This default value is quite modest and the performance can be
> influenced.
> >>>> We propose to increase this value to a larger one, for example, 512
> (the
> >>>> default TM and network buffer configuration can serve more than 10
> result
> >>>> partitions concurrently).
> >>>> >> >>>
> >>>> >> >>> We already tested these default values together with tpc-ds
> >>>> benchmark in a cluster and both the performance and stability
> improved a
> >>>> lot. These changes can help to improve the out-of-box experience of
> >>>> blocking shuffle. What do you think about these changes? Is there any
> >>>> concern? If there are no objections, I will make these changes soon.
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yingjie
> >>>> >> >>>
> >>>> >>
> >>>> >>
> >>>> >> --
> >>>> >> Best, Jingsong Lee
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
>
>
>

Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by Yingjie Cao <ke...@gmail.com>.
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao <yu...@aliyun.com> 于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> ------------------------------------------------------------------
> From:刘建刚 <li...@gmail.com>
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh <us...@flink.apache.org>
> Cc:dev <de...@flink.apache.org>; user <us...@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stability gains are big. Any feedbacks especially those from Flink batch
> > users are highly appreciated.
> >
> > BTW, aside from the above tests, I also tries to tune some more config
> > options to try to make the TPC-DS test faster. I copied these tuned
> config
> > options from our daily TPC-DS settings. The results show that the
> optimized
> > configuration can improve the TPC-DS performance about 30%. Though these
> > settings may not the best, they really help compared to the default
> value.
> > I attached some settings in this may, I guess some Flink batch users may
> be
> > interested in this. Based on my limited knowledge, I guess that
> increasing
> > the total TaskManager size and network memory size is important for
> > performance, because more memory (managed and network) can make operators
> > and shuffle faster.
> >
> > Best,
> > Yingjie
> >
> >
> >
> > Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
> >
> >> Hi Till,
> >>
> >> Thanks for the suggestion. I think it makes a lot of sense to also
> extend
> >> the documentation for the sort shuffle to include a tuning guide.
> >>
> >> Best,
> >> Yingjie
> >>
> >> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
> >>
> >>> As part of this FLIP, does it make sense to also extend the
> >>> documentation for the sort shuffle [1] to include a tuning guide? I am
> >>> thinking of a more in depth description of what things you might
> observe
> >>> and how to influence them with the configuration options.
> >>>
> >>> [1]
> >>>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Yingjie,
> >>>>
> >>>> Thanks for your explanation. I have no more questions. +1
> >>>>
> >>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
> >>>> wrote:
> >>>> >
> >>>> > Hi Jingsong,
> >>>> >
> >>>> > Thanks for your feedback.
> >>>> >
> >>>> > >>> My question is, what is the maximum parallelism a job can have
> >>>> with the default configuration? (Does this break out of the box)
> >>>> >
> >>>> > Yes, you are right, these two options are related to network memory
> >>>> and framework off-heap memory. Generally, these changes will not
> break out
> >>>> of the box experience, but for some extreme cases, for example, there
> are
> >>>> too many ResultPartitions per task, users may need to increase network
> >>>> memory to avoid "insufficient network buffer" error. For framework
> >>>> off-head, I believe that user do not need to change the default value.
> >>>> >
> >>>> > In fact, I have a basic goal when changing these config values: when
> >>>> running TPCDS of medium parallelism with the default value, all
> queries
> >>>> must pass without any error and at the same time, the performance can
> be
> >>>> improved. I think if we achieve this goal, most common use cases can
> be
> >>>> covered.
> >>>> >
> >>>> > Currently, for the default configuration, the exclusive buffers
> >>>> required at input gate side is still parallelism relevant (though
> since
> >>>> 1.14, we can decouple the network buffer consumption from parallelism
> by
> >>>> setting a config value, it has slight performance influence on
> streaming
> >>>> jobs), which means that no large parallelism can be supported by the
> >>>> default configuration. Roughly, I would say the default value can
> support
> >>>> jobs of several hundreds of parallelism.
> >>>> >
> >>>> > >>> I do feel that this correspondence is a bit difficult to control
> >>>> at the moment, and it would be best if a rough table could be
> provided.
> >>>> >
> >>>> > I think this is a good suggestion, we can provide those suggestions
> >>>> in the document.
> >>>> >
> >>>> > Best,
> >>>> > Yingjie
> >>>> >
> >>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
> >>>> >>
> >>>> >> Hi  Yingjie,
> >>>> >>
> >>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the
> ease
> >>>> >> of batch jobs.
> >>>> >>
> >>>> >> Looks like
> "taskmanager.memory.framework.off-heap.batch-shuffle.size"
> >>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
> >>>> >> network memory and framework.off-heap.size.
> >>>> >>
> >>>> >> My question is, what is the maximum parallelism a job can have with
> >>>> >> the default configuration? (Does this break out of the box)
> >>>> >>
> >>>> >> How much network memory and framework.off-heap.size are required
> for
> >>>> >> how much parallelism in the default configuration?
> >>>> >>
> >>>> >> I do feel that this correspondence is a bit difficult to control at
> >>>> >> the moment, and it would be best if a rough table could be
> provided.
> >>>> >>
> >>>> >> Best,
> >>>> >> Jingsong
> >>>> >>
> >>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <
> kevin.yingjie@gmail.com>
> >>>> wrote:
> >>>> >> >
> >>>> >> > Hi Jiangang,
> >>>> >> >
> >>>> >> > Thanks for your suggestion.
> >>>> >> >
> >>>> >> > >>> The config can affect the memory usage. Will the related
> >>>> memory configs be changed?
> >>>> >> >
> >>>> >> > I think we will not change the default network memory settings.
> My
> >>>> best expectation is that the default value can work for most cases
> (though
> >>>> may not the best) and for other cases, user may need to tune the
> memory
> >>>> settings.
> >>>> >> >
> >>>> >> > >>> Can you share the tpcds results for different configs?
> >>>> Although we change the default values, it is helpful to change them
> for
> >>>> different users. In this case, the experience can help a lot.
> >>>> >> >
> >>>> >> > I did not keep all previous TPCDS results, but from the results,
> I
> >>>> can tell that on HDD, always using the sort-shuffle is a good choice.
> For
> >>>> small jobs, using sort-shuffle may not bring much performance gain,
> this
> >>>> may because that all shuffle data can be cached in memory (page
> cache),
> >>>> this is the case if the cluster have enough resources. However, if the
> >>>> whole cluster is under heavy burden or you are running large scale
> jobs,
> >>>> the performance of those small jobs can also be influenced. For
> large-scale
> >>>> jobs, the configurations suggested to be tuned are
> >>>> taskmanager.network.sort-shuffle.min-buffers and
> >>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can
> increase
> >>>> these values for large-scale batch jobs.
> >>>> >> >
> >>>> >> > BTW, I am still running TPCDS tests these days and I can share
> >>>> these results soon.
> >>>> >> >
> >>>> >> > Best,
> >>>> >> > Yingjie
> >>>> >> >
> >>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
> >>>> >> >>
> >>>> >> >> Glad to see the suggestion. In our test, we found that small
> jobs
> >>>> with the changing configs can not improve the performance much just
> as your
> >>>> test. I have some suggestions:
> >>>> >> >>
> >>>> >> >> The config can affect the memory usage. Will the related memory
> >>>> configs be changed?
> >>>> >> >> Can you share the tpcds results for different configs? Although
> >>>> we change the default values, it is helpful to change them for
> different
> >>>> users. In this case, the experience can help a lot.
> >>>> >> >>
> >>>> >> >> Best,
> >>>> >> >> Liu Jiangang
> >>>> >> >>
> >>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
> >>>> >> >>>
> >>>> >> >>> Hi Yingjie,
> >>>> >> >>>
> >>>> >> >>> Very thanks for drafting the FLIP and initiating the
> discussion!
> >>>> >> >>>
> >>>> >> >>> May I have a double confirmation for
> >>>> taskmanager.network.sort-shuffle.min-parallelism that
> >>>> >> >>> since other frameworks like Spark have used sort-based shuffle
> >>>> for all the cases, does our
> >>>> >> >>> current circumstance still have difference with them?
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yun
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> ------------------------------------------------------------------
> >>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
> >>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
> >>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
> >>>> user-zh <us...@flink.apache.org>
> >>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
> >>>> blocking shuffle
> >>>> >> >>>
> >>>> >> >>> Hi dev & users:
> >>>> >> >>>
> >>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
> >>>> appreciated.
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yingjie
> >>>> >> >>>
> >>>> >> >>> [1]
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
> >>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
> >>>> >> >>>
> >>>> >> >>> Hi dev & users,
> >>>> >> >>>
> >>>> >> >>> We propose to change some default values of blocking shuffle to
> >>>> improve the user out-of-box experience (not influence streaming). The
> >>>> default values we want to change are as follows:
> >>>> >> >>>
> >>>> >> >>> 1. Data compression
> >>>> (taskmanager.network.blocking-shuffle.compression.enabled):
> Currently, the
> >>>> default value is 'false'.  Usually, data compression can reduce both
> disk
> >>>> and network IO which is good for performance. At the same time, it
> can save
> >>>> storage space. We propose to change the default value to true.
> >>>> >> >>>
> >>>> >> >>> 2. Default shuffle implementation
> >>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the
> default
> >>>> value is 'Integer.MAX', which means by default, Flink jobs will
> always use
> >>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better
> for
> >>>> both stability and performance. So we propose to reduce the default
> value
> >>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512
> and
> >>>> 1024 with a tpc-ds and 128 is the best one.)
> >>>> >> >>>
> >>>> >> >>> 3. Read buffer of sort-shuffle
> >>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size):
> Currently, the
> >>>> default value is '32M'. Previously, when choosing the default value,
> both
> >>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a
> cautious
> >>>> way. However, recently, it is reported in the mailing list that the
> default
> >>>> value is not enough which caused a buffer request timeout issue. We
> already
> >>>> created a ticket to improve the behavior. At the same time, we
> propose to
> >>>> increase this default value to '64M' which can also help.
> >>>> >> >>>
> >>>> >> >>> 4. Sort buffer size of sort-shuffle
> >>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> >>>> value is '64' which means '64' network buffers (32k per buffer by
> default).
> >>>> This default value is quite modest and the performance can be
> influenced.
> >>>> We propose to increase this value to a larger one, for example, 512
> (the
> >>>> default TM and network buffer configuration can serve more than 10
> result
> >>>> partitions concurrently).
> >>>> >> >>>
> >>>> >> >>> We already tested these default values together with tpc-ds
> >>>> benchmark in a cluster and both the performance and stability
> improved a
> >>>> lot. These changes can help to improve the out-of-box experience of
> >>>> blocking shuffle. What do you think about these changes? Is there any
> >>>> concern? If there are no objections, I will make these changes soon.
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yingjie
> >>>> >> >>>
> >>>> >>
> >>>> >>
> >>>> >> --
> >>>> >> Best, Jingsong Lee
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
>
>
>

Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by Yingjie Cao <ke...@gmail.com>.
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao <yu...@aliyun.com> 于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> ------------------------------------------------------------------
> From:刘建刚 <li...@gmail.com>
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh <us...@flink.apache.org>
> Cc:dev <de...@flink.apache.org>; user <us...@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stability gains are big. Any feedbacks especially those from Flink batch
> > users are highly appreciated.
> >
> > BTW, aside from the above tests, I also tries to tune some more config
> > options to try to make the TPC-DS test faster. I copied these tuned
> config
> > options from our daily TPC-DS settings. The results show that the
> optimized
> > configuration can improve the TPC-DS performance about 30%. Though these
> > settings may not the best, they really help compared to the default
> value.
> > I attached some settings in this may, I guess some Flink batch users may
> be
> > interested in this. Based on my limited knowledge, I guess that
> increasing
> > the total TaskManager size and network memory size is important for
> > performance, because more memory (managed and network) can make operators
> > and shuffle faster.
> >
> > Best,
> > Yingjie
> >
> >
> >
> > Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
> >
> >> Hi Till,
> >>
> >> Thanks for the suggestion. I think it makes a lot of sense to also
> extend
> >> the documentation for the sort shuffle to include a tuning guide.
> >>
> >> Best,
> >> Yingjie
> >>
> >> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
> >>
> >>> As part of this FLIP, does it make sense to also extend the
> >>> documentation for the sort shuffle [1] to include a tuning guide? I am
> >>> thinking of a more in depth description of what things you might
> observe
> >>> and how to influence them with the configuration options.
> >>>
> >>> [1]
> >>>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Yingjie,
> >>>>
> >>>> Thanks for your explanation. I have no more questions. +1
> >>>>
> >>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
> >>>> wrote:
> >>>> >
> >>>> > Hi Jingsong,
> >>>> >
> >>>> > Thanks for your feedback.
> >>>> >
> >>>> > >>> My question is, what is the maximum parallelism a job can have
> >>>> with the default configuration? (Does this break out of the box)
> >>>> >
> >>>> > Yes, you are right, these two options are related to network memory
> >>>> and framework off-heap memory. Generally, these changes will not
> break out
> >>>> of the box experience, but for some extreme cases, for example, there
> are
> >>>> too many ResultPartitions per task, users may need to increase network
> >>>> memory to avoid "insufficient network buffer" error. For framework
> >>>> off-head, I believe that user do not need to change the default value.
> >>>> >
> >>>> > In fact, I have a basic goal when changing these config values: when
> >>>> running TPCDS of medium parallelism with the default value, all
> queries
> >>>> must pass without any error and at the same time, the performance can
> be
> >>>> improved. I think if we achieve this goal, most common use cases can
> be
> >>>> covered.
> >>>> >
> >>>> > Currently, for the default configuration, the exclusive buffers
> >>>> required at input gate side is still parallelism relevant (though
> since
> >>>> 1.14, we can decouple the network buffer consumption from parallelism
> by
> >>>> setting a config value, it has slight performance influence on
> streaming
> >>>> jobs), which means that no large parallelism can be supported by the
> >>>> default configuration. Roughly, I would say the default value can
> support
> >>>> jobs of several hundreds of parallelism.
> >>>> >
> >>>> > >>> I do feel that this correspondence is a bit difficult to control
> >>>> at the moment, and it would be best if a rough table could be
> provided.
> >>>> >
> >>>> > I think this is a good suggestion, we can provide those suggestions
> >>>> in the document.
> >>>> >
> >>>> > Best,
> >>>> > Yingjie
> >>>> >
> >>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
> >>>> >>
> >>>> >> Hi  Yingjie,
> >>>> >>
> >>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the
> ease
> >>>> >> of batch jobs.
> >>>> >>
> >>>> >> Looks like
> "taskmanager.memory.framework.off-heap.batch-shuffle.size"
> >>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
> >>>> >> network memory and framework.off-heap.size.
> >>>> >>
> >>>> >> My question is, what is the maximum parallelism a job can have with
> >>>> >> the default configuration? (Does this break out of the box)
> >>>> >>
> >>>> >> How much network memory and framework.off-heap.size are required
> for
> >>>> >> how much parallelism in the default configuration?
> >>>> >>
> >>>> >> I do feel that this correspondence is a bit difficult to control at
> >>>> >> the moment, and it would be best if a rough table could be
> provided.
> >>>> >>
> >>>> >> Best,
> >>>> >> Jingsong
> >>>> >>
> >>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <
> kevin.yingjie@gmail.com>
> >>>> wrote:
> >>>> >> >
> >>>> >> > Hi Jiangang,
> >>>> >> >
> >>>> >> > Thanks for your suggestion.
> >>>> >> >
> >>>> >> > >>> The config can affect the memory usage. Will the related
> >>>> memory configs be changed?
> >>>> >> >
> >>>> >> > I think we will not change the default network memory settings.
> My
> >>>> best expectation is that the default value can work for most cases
> (though
> >>>> may not the best) and for other cases, user may need to tune the
> memory
> >>>> settings.
> >>>> >> >
> >>>> >> > >>> Can you share the tpcds results for different configs?
> >>>> Although we change the default values, it is helpful to change them
> for
> >>>> different users. In this case, the experience can help a lot.
> >>>> >> >
> >>>> >> > I did not keep all previous TPCDS results, but from the results,
> I
> >>>> can tell that on HDD, always using the sort-shuffle is a good choice.
> For
> >>>> small jobs, using sort-shuffle may not bring much performance gain,
> this
> >>>> may because that all shuffle data can be cached in memory (page
> cache),
> >>>> this is the case if the cluster have enough resources. However, if the
> >>>> whole cluster is under heavy burden or you are running large scale
> jobs,
> >>>> the performance of those small jobs can also be influenced. For
> large-scale
> >>>> jobs, the configurations suggested to be tuned are
> >>>> taskmanager.network.sort-shuffle.min-buffers and
> >>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can
> increase
> >>>> these values for large-scale batch jobs.
> >>>> >> >
> >>>> >> > BTW, I am still running TPCDS tests these days and I can share
> >>>> these results soon.
> >>>> >> >
> >>>> >> > Best,
> >>>> >> > Yingjie
> >>>> >> >
> >>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
> >>>> >> >>
> >>>> >> >> Glad to see the suggestion. In our test, we found that small
> jobs
> >>>> with the changing configs can not improve the performance much just
> as your
> >>>> test. I have some suggestions:
> >>>> >> >>
> >>>> >> >> The config can affect the memory usage. Will the related memory
> >>>> configs be changed?
> >>>> >> >> Can you share the tpcds results for different configs? Although
> >>>> we change the default values, it is helpful to change them for
> different
> >>>> users. In this case, the experience can help a lot.
> >>>> >> >>
> >>>> >> >> Best,
> >>>> >> >> Liu Jiangang
> >>>> >> >>
> >>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
> >>>> >> >>>
> >>>> >> >>> Hi Yingjie,
> >>>> >> >>>
> >>>> >> >>> Very thanks for drafting the FLIP and initiating the
> discussion!
> >>>> >> >>>
> >>>> >> >>> May I have a double confirmation for
> >>>> taskmanager.network.sort-shuffle.min-parallelism that
> >>>> >> >>> since other frameworks like Spark have used sort-based shuffle
> >>>> for all the cases, does our
> >>>> >> >>> current circumstance still have difference with them?
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yun
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> >> >>>
> >>>> ------------------------------------------------------------------
> >>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
> >>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
> >>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
> >>>> user-zh <us...@flink.apache.org>
> >>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
> >>>> blocking shuffle
> >>>> >> >>>
> >>>> >> >>> Hi dev & users:
> >>>> >> >>>
> >>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
> >>>> appreciated.
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yingjie
> >>>> >> >>>
> >>>> >> >>> [1]
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
> >>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
> >>>> >> >>>
> >>>> >> >>> Hi dev & users,
> >>>> >> >>>
> >>>> >> >>> We propose to change some default values of blocking shuffle to
> >>>> improve the user out-of-box experience (not influence streaming). The
> >>>> default values we want to change are as follows:
> >>>> >> >>>
> >>>> >> >>> 1. Data compression
> >>>> (taskmanager.network.blocking-shuffle.compression.enabled):
> Currently, the
> >>>> default value is 'false'.  Usually, data compression can reduce both
> disk
> >>>> and network IO which is good for performance. At the same time, it
> can save
> >>>> storage space. We propose to change the default value to true.
> >>>> >> >>>
> >>>> >> >>> 2. Default shuffle implementation
> >>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the
> default
> >>>> value is 'Integer.MAX', which means by default, Flink jobs will
> always use
> >>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better
> for
> >>>> both stability and performance. So we propose to reduce the default
> value
> >>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512
> and
> >>>> 1024 with a tpc-ds and 128 is the best one.)
> >>>> >> >>>
> >>>> >> >>> 3. Read buffer of sort-shuffle
> >>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size):
> Currently, the
> >>>> default value is '32M'. Previously, when choosing the default value,
> both
> >>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a
> cautious
> >>>> way. However, recently, it is reported in the mailing list that the
> default
> >>>> value is not enough which caused a buffer request timeout issue. We
> already
> >>>> created a ticket to improve the behavior. At the same time, we
> propose to
> >>>> increase this default value to '64M' which can also help.
> >>>> >> >>>
> >>>> >> >>> 4. Sort buffer size of sort-shuffle
> >>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> >>>> value is '64' which means '64' network buffers (32k per buffer by
> default).
> >>>> This default value is quite modest and the performance can be
> influenced.
> >>>> We propose to increase this value to a larger one, for example, 512
> (the
> >>>> default TM and network buffer configuration can serve more than 10
> result
> >>>> partitions concurrently).
> >>>> >> >>>
> >>>> >> >>> We already tested these default values together with tpc-ds
> >>>> benchmark in a cluster and both the performance and stability
> improved a
> >>>> lot. These changes can help to improve the out-of-box experience of
> >>>> blocking shuffle. What do you think about these changes? Is there any
> >>>> concern? If there are no objections, I will make these changes soon.
> >>>> >> >>>
> >>>> >> >>> Best,
> >>>> >> >>> Yingjie
> >>>> >> >>>
> >>>> >>
> >>>> >>
> >>>> >> --
> >>>> >> Best, Jingsong Lee
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
>
>
>

Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Very thanks @Yingjie for completing the experiments! 

Also +1 for changing the default config values. From the experiments, 
Changing the default config values would largely increase the open box
experience of the flink batch, thus it seems worth changing from my side
even if it would cause some compatibility issue under some scenarios. In
addition, if we finally have to break compatibility, we might do it early to 
avoid affecting more users. 

Best,
Yun


------------------------------------------------------------------
From:刘建刚 <li...@gmail.com>
Send Time:2022 Jan. 4 (Tue.) 20:43
To:user-zh <us...@flink.apache.org>
Cc:dev <de...@flink.apache.org>; user <us...@flink.apache.org>
Subject:Re: [DISCUSS] Change some default config values of blocking shuffle

Thanks for the experiment. +1 for the changes.

Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
>
>> Hi Till,
>>
>> Thanks for the suggestion. I think it makes a lot of sense to also extend
>> the documentation for the sort shuffle to include a tuning guide.
>>
>> Best,
>> Yingjie
>>
>> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
>>
>>> As part of this FLIP, does it make sense to also extend the
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change the default network memory settings. My
>>>> best expectation is that the default value can work for most cases (though
>>>> may not the best) and for other cases, user may need to tune the memory
>>>> settings.
>>>> >> >
>>>> >> > >>> Can you share the tpcds results for different configs?
>>>> Although we change the default values, it is helpful to change them for
>>>> different users. In this case, the experience can help a lot.
>>>> >> >
>>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>>> may because that all shuffle data can be cached in memory (page cache),
>>>> this is the case if the cluster have enough resources. However, if the
>>>> whole cluster is under heavy burden or you are running large scale jobs,
>>>> the performance of those small jobs can also be influenced. For large-scale
>>>> jobs, the configurations suggested to be tuned are
>>>> taskmanager.network.sort-shuffle.min-buffers and
>>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>>> these values for large-scale batch jobs.
>>>> >> >
>>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>>> these results soon.
>>>> >> >
>>>> >> > Best,
>>>> >> > Yingjie
>>>> >> >
>>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
>>>> >> >>
>>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>>> with the changing configs can not improve the performance much just as your
>>>> test. I have some suggestions:
>>>> >> >>
>>>> >> >> The config can affect the memory usage. Will the related memory
>>>> configs be changed?
>>>> >> >> Can you share the tpcds results for different configs? Although
>>>> we change the default values, it is helpful to change them for different
>>>> users. In this case, the experience can help a lot.
>>>> >> >>
>>>> >> >> Best,
>>>> >> >> Liu Jiangang
>>>> >> >>
>>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>>> >> >>>
>>>> >> >>> Hi Yingjie,
>>>> >> >>>
>>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>>> >> >>>
>>>> >> >>> May I have a double confirmation for
>>>> taskmanager.network.sort-shuffle.min-parallelism that
>>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>>> for all the cases, does our
>>>> >> >>> current circumstance still have difference with them?
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yun
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> ------------------------------------------------------------------
>>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
>>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
>>>> user-zh <us...@flink.apache.org>
>>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>>> blocking shuffle
>>>> >> >>>
>>>> >> >>> Hi dev & users:
>>>> >> >>>
>>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>>> appreciated.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >> >>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
>>>> >> >>>
>>>> >> >>> Hi dev & users,
>>>> >> >>>
>>>> >> >>> We propose to change some default values of blocking shuffle to
>>>> improve the user out-of-box experience (not influence streaming). The
>>>> default values we want to change are as follows:
>>>> >> >>>
>>>> >> >>> 1. Data compression
>>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>>> default value is 'false'.  Usually, data compression can reduce both disk
>>>> and network IO which is good for performance. At the same time, it can save
>>>> storage space. We propose to change the default value to true.
>>>> >> >>>
>>>> >> >>> 2. Default shuffle implementation
>>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>>> both stability and performance. So we propose to reduce the default value
>>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>>> 1024 with a tpc-ds and 128 is the best one.)
>>>> >> >>>
>>>> >> >>> 3. Read buffer of sort-shuffle
>>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>>> default value is '32M'. Previously, when choosing the default value, both
>>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>>> way. However, recently, it is reported in the mailing list that the default
>>>> value is not enough which caused a buffer request timeout issue. We already
>>>> created a ticket to improve the behavior. At the same time, we propose to
>>>> increase this default value to '64M' which can also help.
>>>> >> >>>
>>>> >> >>> 4. Sort buffer size of sort-shuffle
>>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>>> This default value is quite modest and the performance can be influenced.
>>>> We propose to increase this value to a larger one, for example, 512 (the
>>>> default TM and network buffer configuration can serve more than 10 result
>>>> partitions concurrently).
>>>> >> >>>
>>>> >> >>> We already tested these default values together with tpc-ds
>>>> benchmark in a cluster and both the performance and stability improved a
>>>> lot. These changes can help to improve the out-of-box experience of
>>>> blocking shuffle. What do you think about these changes? Is there any
>>>> concern? If there are no objections, I will make these changes soon.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>


Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Very thanks @Yingjie for completing the experiments! 

Also +1 for changing the default config values. From the experiments, 
Changing the default config values would largely increase the open box
experience of the flink batch, thus it seems worth changing from my side
even if it would cause some compatibility issue under some scenarios. In
addition, if we finally have to break compatibility, we might do it early to 
avoid affecting more users. 

Best,
Yun


------------------------------------------------------------------
From:刘建刚 <li...@gmail.com>
Send Time:2022 Jan. 4 (Tue.) 20:43
To:user-zh <us...@flink.apache.org>
Cc:dev <de...@flink.apache.org>; user <us...@flink.apache.org>
Subject:Re: [DISCUSS] Change some default config values of blocking shuffle

Thanks for the experiment. +1 for the changes.

Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
>
>> Hi Till,
>>
>> Thanks for the suggestion. I think it makes a lot of sense to also extend
>> the documentation for the sort shuffle to include a tuning guide.
>>
>> Best,
>> Yingjie
>>
>> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
>>
>>> As part of this FLIP, does it make sense to also extend the
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change the default network memory settings. My
>>>> best expectation is that the default value can work for most cases (though
>>>> may not the best) and for other cases, user may need to tune the memory
>>>> settings.
>>>> >> >
>>>> >> > >>> Can you share the tpcds results for different configs?
>>>> Although we change the default values, it is helpful to change them for
>>>> different users. In this case, the experience can help a lot.
>>>> >> >
>>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>>> may because that all shuffle data can be cached in memory (page cache),
>>>> this is the case if the cluster have enough resources. However, if the
>>>> whole cluster is under heavy burden or you are running large scale jobs,
>>>> the performance of those small jobs can also be influenced. For large-scale
>>>> jobs, the configurations suggested to be tuned are
>>>> taskmanager.network.sort-shuffle.min-buffers and
>>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>>> these values for large-scale batch jobs.
>>>> >> >
>>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>>> these results soon.
>>>> >> >
>>>> >> > Best,
>>>> >> > Yingjie
>>>> >> >
>>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
>>>> >> >>
>>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>>> with the changing configs can not improve the performance much just as your
>>>> test. I have some suggestions:
>>>> >> >>
>>>> >> >> The config can affect the memory usage. Will the related memory
>>>> configs be changed?
>>>> >> >> Can you share the tpcds results for different configs? Although
>>>> we change the default values, it is helpful to change them for different
>>>> users. In this case, the experience can help a lot.
>>>> >> >>
>>>> >> >> Best,
>>>> >> >> Liu Jiangang
>>>> >> >>
>>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>>> >> >>>
>>>> >> >>> Hi Yingjie,
>>>> >> >>>
>>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>>> >> >>>
>>>> >> >>> May I have a double confirmation for
>>>> taskmanager.network.sort-shuffle.min-parallelism that
>>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>>> for all the cases, does our
>>>> >> >>> current circumstance still have difference with them?
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yun
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> ------------------------------------------------------------------
>>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
>>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
>>>> user-zh <us...@flink.apache.org>
>>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>>> blocking shuffle
>>>> >> >>>
>>>> >> >>> Hi dev & users:
>>>> >> >>>
>>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>>> appreciated.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >> >>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
>>>> >> >>>
>>>> >> >>> Hi dev & users,
>>>> >> >>>
>>>> >> >>> We propose to change some default values of blocking shuffle to
>>>> improve the user out-of-box experience (not influence streaming). The
>>>> default values we want to change are as follows:
>>>> >> >>>
>>>> >> >>> 1. Data compression
>>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>>> default value is 'false'.  Usually, data compression can reduce both disk
>>>> and network IO which is good for performance. At the same time, it can save
>>>> storage space. We propose to change the default value to true.
>>>> >> >>>
>>>> >> >>> 2. Default shuffle implementation
>>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>>> both stability and performance. So we propose to reduce the default value
>>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>>> 1024 with a tpc-ds and 128 is the best one.)
>>>> >> >>>
>>>> >> >>> 3. Read buffer of sort-shuffle
>>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>>> default value is '32M'. Previously, when choosing the default value, both
>>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>>> way. However, recently, it is reported in the mailing list that the default
>>>> value is not enough which caused a buffer request timeout issue. We already
>>>> created a ticket to improve the behavior. At the same time, we propose to
>>>> increase this default value to '64M' which can also help.
>>>> >> >>>
>>>> >> >>> 4. Sort buffer size of sort-shuffle
>>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>>> This default value is quite modest and the performance can be influenced.
>>>> We propose to increase this value to a larger one, for example, 512 (the
>>>> default TM and network buffer configuration can serve more than 10 result
>>>> partitions concurrently).
>>>> >> >>>
>>>> >> >>> We already tested these default values together with tpc-ds
>>>> benchmark in a cluster and both the performance and stability improved a
>>>> lot. These changes can help to improve the out-of-box experience of
>>>> blocking shuffle. What do you think about these changes? Is there any
>>>> concern? If there are no objections, I will make these changes soon.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>


Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by Yun Gao <yu...@aliyun.com>.
Very thanks @Yingjie for completing the experiments! 

Also +1 for changing the default config values. From the experiments, 
Changing the default config values would largely increase the open box
experience of the flink batch, thus it seems worth changing from my side
even if it would cause some compatibility issue under some scenarios. In
addition, if we finally have to break compatibility, we might do it early to 
avoid affecting more users. 

Best,
Yun


------------------------------------------------------------------
From:刘建刚 <li...@gmail.com>
Send Time:2022 Jan. 4 (Tue.) 20:43
To:user-zh <us...@flink.apache.org>
Cc:dev <de...@flink.apache.org>; user <us...@flink.apache.org>
Subject:Re: [DISCUSS] Change some default config values of blocking shuffle

Thanks for the experiment. +1 for the changes.

Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
>
>> Hi Till,
>>
>> Thanks for the suggestion. I think it makes a lot of sense to also extend
>> the documentation for the sort shuffle to include a tuning guide.
>>
>> Best,
>> Yingjie
>>
>> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
>>
>>> As part of this FLIP, does it make sense to also extend the
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change the default network memory settings. My
>>>> best expectation is that the default value can work for most cases (though
>>>> may not the best) and for other cases, user may need to tune the memory
>>>> settings.
>>>> >> >
>>>> >> > >>> Can you share the tpcds results for different configs?
>>>> Although we change the default values, it is helpful to change them for
>>>> different users. In this case, the experience can help a lot.
>>>> >> >
>>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>>> may because that all shuffle data can be cached in memory (page cache),
>>>> this is the case if the cluster have enough resources. However, if the
>>>> whole cluster is under heavy burden or you are running large scale jobs,
>>>> the performance of those small jobs can also be influenced. For large-scale
>>>> jobs, the configurations suggested to be tuned are
>>>> taskmanager.network.sort-shuffle.min-buffers and
>>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>>> these values for large-scale batch jobs.
>>>> >> >
>>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>>> these results soon.
>>>> >> >
>>>> >> > Best,
>>>> >> > Yingjie
>>>> >> >
>>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
>>>> >> >>
>>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>>> with the changing configs can not improve the performance much just as your
>>>> test. I have some suggestions:
>>>> >> >>
>>>> >> >> The config can affect the memory usage. Will the related memory
>>>> configs be changed?
>>>> >> >> Can you share the tpcds results for different configs? Although
>>>> we change the default values, it is helpful to change them for different
>>>> users. In this case, the experience can help a lot.
>>>> >> >>
>>>> >> >> Best,
>>>> >> >> Liu Jiangang
>>>> >> >>
>>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>>> >> >>>
>>>> >> >>> Hi Yingjie,
>>>> >> >>>
>>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>>> >> >>>
>>>> >> >>> May I have a double confirmation for
>>>> taskmanager.network.sort-shuffle.min-parallelism that
>>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>>> for all the cases, does our
>>>> >> >>> current circumstance still have difference with them?
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yun
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> ------------------------------------------------------------------
>>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
>>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
>>>> user-zh <us...@flink.apache.org>
>>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>>> blocking shuffle
>>>> >> >>>
>>>> >> >>> Hi dev & users:
>>>> >> >>>
>>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>>> appreciated.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >> >>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
>>>> >> >>>
>>>> >> >>> Hi dev & users,
>>>> >> >>>
>>>> >> >>> We propose to change some default values of blocking shuffle to
>>>> improve the user out-of-box experience (not influence streaming). The
>>>> default values we want to change are as follows:
>>>> >> >>>
>>>> >> >>> 1. Data compression
>>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>>> default value is 'false'.  Usually, data compression can reduce both disk
>>>> and network IO which is good for performance. At the same time, it can save
>>>> storage space. We propose to change the default value to true.
>>>> >> >>>
>>>> >> >>> 2. Default shuffle implementation
>>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>>> both stability and performance. So we propose to reduce the default value
>>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>>> 1024 with a tpc-ds and 128 is the best one.)
>>>> >> >>>
>>>> >> >>> 3. Read buffer of sort-shuffle
>>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>>> default value is '32M'. Previously, when choosing the default value, both
>>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>>> way. However, recently, it is reported in the mailing list that the default
>>>> value is not enough which caused a buffer request timeout issue. We already
>>>> created a ticket to improve the behavior. At the same time, we propose to
>>>> increase this default value to '64M' which can also help.
>>>> >> >>>
>>>> >> >>> 4. Sort buffer size of sort-shuffle
>>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>>> This default value is quite modest and the performance can be influenced.
>>>> We propose to increase this value to a larger one, for example, 512 (the
>>>> default TM and network buffer configuration can serve more than 10 result
>>>> partitions concurrently).
>>>> >> >>>
>>>> >> >>> We already tested these default values together with tpc-ds
>>>> benchmark in a cluster and both the performance and stability improved a
>>>> lot. These changes can help to improve the out-of-box experience of
>>>> blocking shuffle. What do you think about these changes? Is there any
>>>> concern? If there are no objections, I will make these changes soon.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>


Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by 刘建刚 <li...@gmail.com>.
Thanks for the experiment. +1 for the changes.

Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
>
>> Hi Till,
>>
>> Thanks for the suggestion. I think it makes a lot of sense to also extend
>> the documentation for the sort shuffle to include a tuning guide.
>>
>> Best,
>> Yingjie
>>
>> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
>>
>>> As part of this FLIP, does it make sense to also extend the
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change the default network memory settings. My
>>>> best expectation is that the default value can work for most cases (though
>>>> may not the best) and for other cases, user may need to tune the memory
>>>> settings.
>>>> >> >
>>>> >> > >>> Can you share the tpcds results for different configs?
>>>> Although we change the default values, it is helpful to change them for
>>>> different users. In this case, the experience can help a lot.
>>>> >> >
>>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>>> may because that all shuffle data can be cached in memory (page cache),
>>>> this is the case if the cluster have enough resources. However, if the
>>>> whole cluster is under heavy burden or you are running large scale jobs,
>>>> the performance of those small jobs can also be influenced. For large-scale
>>>> jobs, the configurations suggested to be tuned are
>>>> taskmanager.network.sort-shuffle.min-buffers and
>>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>>> these values for large-scale batch jobs.
>>>> >> >
>>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>>> these results soon.
>>>> >> >
>>>> >> > Best,
>>>> >> > Yingjie
>>>> >> >
>>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
>>>> >> >>
>>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>>> with the changing configs can not improve the performance much just as your
>>>> test. I have some suggestions:
>>>> >> >>
>>>> >> >> The config can affect the memory usage. Will the related memory
>>>> configs be changed?
>>>> >> >> Can you share the tpcds results for different configs? Although
>>>> we change the default values, it is helpful to change them for different
>>>> users. In this case, the experience can help a lot.
>>>> >> >>
>>>> >> >> Best,
>>>> >> >> Liu Jiangang
>>>> >> >>
>>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>>> >> >>>
>>>> >> >>> Hi Yingjie,
>>>> >> >>>
>>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>>> >> >>>
>>>> >> >>> May I have a double confirmation for
>>>> taskmanager.network.sort-shuffle.min-parallelism that
>>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>>> for all the cases, does our
>>>> >> >>> current circumstance still have difference with them?
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yun
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> ------------------------------------------------------------------
>>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
>>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
>>>> user-zh <us...@flink.apache.org>
>>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>>> blocking shuffle
>>>> >> >>>
>>>> >> >>> Hi dev & users:
>>>> >> >>>
>>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>>> appreciated.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >> >>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
>>>> >> >>>
>>>> >> >>> Hi dev & users,
>>>> >> >>>
>>>> >> >>> We propose to change some default values of blocking shuffle to
>>>> improve the user out-of-box experience (not influence streaming). The
>>>> default values we want to change are as follows:
>>>> >> >>>
>>>> >> >>> 1. Data compression
>>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>>> default value is 'false'.  Usually, data compression can reduce both disk
>>>> and network IO which is good for performance. At the same time, it can save
>>>> storage space. We propose to change the default value to true.
>>>> >> >>>
>>>> >> >>> 2. Default shuffle implementation
>>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>>> both stability and performance. So we propose to reduce the default value
>>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>>> 1024 with a tpc-ds and 128 is the best one.)
>>>> >> >>>
>>>> >> >>> 3. Read buffer of sort-shuffle
>>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>>> default value is '32M'. Previously, when choosing the default value, both
>>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>>> way. However, recently, it is reported in the mailing list that the default
>>>> value is not enough which caused a buffer request timeout issue. We already
>>>> created a ticket to improve the behavior. At the same time, we propose to
>>>> increase this default value to '64M' which can also help.
>>>> >> >>>
>>>> >> >>> 4. Sort buffer size of sort-shuffle
>>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>>> This default value is quite modest and the performance can be influenced.
>>>> We propose to increase this value to a larger one, for example, 512 (the
>>>> default TM and network buffer configuration can serve more than 10 result
>>>> partitions concurrently).
>>>> >> >>>
>>>> >> >>> We already tested these default values together with tpc-ds
>>>> benchmark in a cluster and both the performance and stability improved a
>>>> lot. These changes can help to improve the out-of-box experience of
>>>> blocking shuffle. What do you think about these changes? Is there any
>>>> concern? If there are no objections, I will make these changes soon.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>

Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by 刘建刚 <li...@gmail.com>.
Thanks for the experiment. +1 for the changes.

Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
>
>> Hi Till,
>>
>> Thanks for the suggestion. I think it makes a lot of sense to also extend
>> the documentation for the sort shuffle to include a tuning guide.
>>
>> Best,
>> Yingjie
>>
>> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
>>
>>> As part of this FLIP, does it make sense to also extend the
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change the default network memory settings. My
>>>> best expectation is that the default value can work for most cases (though
>>>> may not the best) and for other cases, user may need to tune the memory
>>>> settings.
>>>> >> >
>>>> >> > >>> Can you share the tpcds results for different configs?
>>>> Although we change the default values, it is helpful to change them for
>>>> different users. In this case, the experience can help a lot.
>>>> >> >
>>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>>> may because that all shuffle data can be cached in memory (page cache),
>>>> this is the case if the cluster have enough resources. However, if the
>>>> whole cluster is under heavy burden or you are running large scale jobs,
>>>> the performance of those small jobs can also be influenced. For large-scale
>>>> jobs, the configurations suggested to be tuned are
>>>> taskmanager.network.sort-shuffle.min-buffers and
>>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>>> these values for large-scale batch jobs.
>>>> >> >
>>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>>> these results soon.
>>>> >> >
>>>> >> > Best,
>>>> >> > Yingjie
>>>> >> >
>>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
>>>> >> >>
>>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>>> with the changing configs can not improve the performance much just as your
>>>> test. I have some suggestions:
>>>> >> >>
>>>> >> >> The config can affect the memory usage. Will the related memory
>>>> configs be changed?
>>>> >> >> Can you share the tpcds results for different configs? Although
>>>> we change the default values, it is helpful to change them for different
>>>> users. In this case, the experience can help a lot.
>>>> >> >>
>>>> >> >> Best,
>>>> >> >> Liu Jiangang
>>>> >> >>
>>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>>> >> >>>
>>>> >> >>> Hi Yingjie,
>>>> >> >>>
>>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>>> >> >>>
>>>> >> >>> May I have a double confirmation for
>>>> taskmanager.network.sort-shuffle.min-parallelism that
>>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>>> for all the cases, does our
>>>> >> >>> current circumstance still have difference with them?
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yun
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> ------------------------------------------------------------------
>>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
>>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
>>>> user-zh <us...@flink.apache.org>
>>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>>> blocking shuffle
>>>> >> >>>
>>>> >> >>> Hi dev & users:
>>>> >> >>>
>>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>>> appreciated.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >> >>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
>>>> >> >>>
>>>> >> >>> Hi dev & users,
>>>> >> >>>
>>>> >> >>> We propose to change some default values of blocking shuffle to
>>>> improve the user out-of-box experience (not influence streaming). The
>>>> default values we want to change are as follows:
>>>> >> >>>
>>>> >> >>> 1. Data compression
>>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>>> default value is 'false'.  Usually, data compression can reduce both disk
>>>> and network IO which is good for performance. At the same time, it can save
>>>> storage space. We propose to change the default value to true.
>>>> >> >>>
>>>> >> >>> 2. Default shuffle implementation
>>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>>> both stability and performance. So we propose to reduce the default value
>>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>>> 1024 with a tpc-ds and 128 is the best one.)
>>>> >> >>>
>>>> >> >>> 3. Read buffer of sort-shuffle
>>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>>> default value is '32M'. Previously, when choosing the default value, both
>>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>>> way. However, recently, it is reported in the mailing list that the default
>>>> value is not enough which caused a buffer request timeout issue. We already
>>>> created a ticket to improve the behavior. At the same time, we propose to
>>>> increase this default value to '64M' which can also help.
>>>> >> >>>
>>>> >> >>> 4. Sort buffer size of sort-shuffle
>>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>>> This default value is quite modest and the performance can be influenced.
>>>> We propose to increase this value to a larger one, for example, 512 (the
>>>> default TM and network buffer configuration can serve more than 10 result
>>>> partitions concurrently).
>>>> >> >>>
>>>> >> >>> We already tested these default values together with tpc-ds
>>>> benchmark in a cluster and both the performance and stability improved a
>>>> lot. These changes can help to improve the out-of-box experience of
>>>> blocking shuffle. What do you think about these changes? Is there any
>>>> concern? If there are no objections, I will make these changes soon.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>

Re: [DISCUSS] Change some default config values of blocking shuffle

Posted by 刘建刚 <li...@gmail.com>.
Thanks for the experiment. +1 for the changes.

Yingjie Cao <ke...@gmail.com> 于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao <ke...@gmail.com> 于2021年12月15日周三 12:19写道:
>
>> Hi Till,
>>
>> Thanks for the suggestion. I think it makes a lot of sense to also extend
>> the documentation for the sort shuffle to include a tuning guide.
>>
>> Best,
>> Yingjie
>>
>> Till Rohrmann <tr...@apache.org> 于2021年12月14日周二 18:57写道:
>>
>>> As part of this FLIP, does it make sense to also extend the
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li <ji...@gmail.com> 于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <ke...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change the default network memory settings. My
>>>> best expectation is that the default value can work for most cases (though
>>>> may not the best) and for other cases, user may need to tune the memory
>>>> settings.
>>>> >> >
>>>> >> > >>> Can you share the tpcds results for different configs?
>>>> Although we change the default values, it is helpful to change them for
>>>> different users. In this case, the experience can help a lot.
>>>> >> >
>>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>>> may because that all shuffle data can be cached in memory (page cache),
>>>> this is the case if the cluster have enough resources. However, if the
>>>> whole cluster is under heavy burden or you are running large scale jobs,
>>>> the performance of those small jobs can also be influenced. For large-scale
>>>> jobs, the configurations suggested to be tuned are
>>>> taskmanager.network.sort-shuffle.min-buffers and
>>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>>> these values for large-scale batch jobs.
>>>> >> >
>>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>>> these results soon.
>>>> >> >
>>>> >> > Best,
>>>> >> > Yingjie
>>>> >> >
>>>> >> > 刘建刚 <li...@gmail.com> 于2021年12月10日周五 18:30写道:
>>>> >> >>
>>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>>> with the changing configs can not improve the performance much just as your
>>>> test. I have some suggestions:
>>>> >> >>
>>>> >> >> The config can affect the memory usage. Will the related memory
>>>> configs be changed?
>>>> >> >> Can you share the tpcds results for different configs? Although
>>>> we change the default values, it is helpful to change them for different
>>>> users. In this case, the experience can help a lot.
>>>> >> >>
>>>> >> >> Best,
>>>> >> >> Liu Jiangang
>>>> >> >>
>>>> >> >> Yun Gao <yu...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>>> >> >>>
>>>> >> >>> Hi Yingjie,
>>>> >> >>>
>>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>>> >> >>>
>>>> >> >>> May I have a double confirmation for
>>>> taskmanager.network.sort-shuffle.min-parallelism that
>>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>>> for all the cases, does our
>>>> >> >>> current circumstance still have difference with them?
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yun
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> ------------------------------------------------------------------
>>>> >> >>> From:Yingjie Cao <ke...@gmail.com>
>>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>>> >> >>> To:dev <de...@flink.apache.org>; user <us...@flink.apache.org>;
>>>> user-zh <us...@flink.apache.org>
>>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>>> blocking shuffle
>>>> >> >>>
>>>> >> >>> Hi dev & users:
>>>> >> >>>
>>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>>> appreciated.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >> >>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>>> >> >>> Yingjie Cao <ke...@gmail.com> 于2021年12月3日周五 17:02写道:
>>>> >> >>>
>>>> >> >>> Hi dev & users,
>>>> >> >>>
>>>> >> >>> We propose to change some default values of blocking shuffle to
>>>> improve the user out-of-box experience (not influence streaming). The
>>>> default values we want to change are as follows:
>>>> >> >>>
>>>> >> >>> 1. Data compression
>>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>>> default value is 'false'.  Usually, data compression can reduce both disk
>>>> and network IO which is good for performance. At the same time, it can save
>>>> storage space. We propose to change the default value to true.
>>>> >> >>>
>>>> >> >>> 2. Default shuffle implementation
>>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>>> both stability and performance. So we propose to reduce the default value
>>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>>> 1024 with a tpc-ds and 128 is the best one.)
>>>> >> >>>
>>>> >> >>> 3. Read buffer of sort-shuffle
>>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>>> default value is '32M'. Previously, when choosing the default value, both
>>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>>> way. However, recently, it is reported in the mailing list that the default
>>>> value is not enough which caused a buffer request timeout issue. We already
>>>> created a ticket to improve the behavior. At the same time, we propose to
>>>> increase this default value to '64M' which can also help.
>>>> >> >>>
>>>> >> >>> 4. Sort buffer size of sort-shuffle
>>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>>> This default value is quite modest and the performance can be influenced.
>>>> We propose to increase this value to a larger one, for example, 512 (the
>>>> default TM and network buffer configuration can serve more than 10 result
>>>> partitions concurrently).
>>>> >> >>>
>>>> >> >>> We already tested these default values together with tpc-ds
>>>> benchmark in a cluster and both the performance and stability improved a
>>>> lot. These changes can help to improve the out-of-box experience of
>>>> blocking shuffle. What do you think about these changes? Is there any
>>>> concern? If there are no objections, I will make these changes soon.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>