You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yuxin Tan <ta...@gmail.com> on 2022/12/19 07:35:43 UTC

[DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Hi, devs,

I'd like to start a discussion about FLIP-266: Simplify network memory
configurations for TaskManager[1].

When using Flink, users may encounter the following issues that affect
usability.
1. The job may fail with an "Insufficient number of network buffers"
exception.
2. Flink network memory size adjustment is complex.
When encountering these issues, users can solve some problems by adding or
adjusting parameters. However, multiple memory config options should be
changed. The config option adjustment requires understanding the detailed
internal implementation, which is impractical for most users.

To simplify network memory configurations for TaskManager and improve Flink
usability, this FLIP proposed some optimization solutions for the issues.

Looking forward to your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager

Best regards,
Yuxin

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Romit Mahanta <ro...@gmail.com>.
If this improves the performance+১

On Sat, 24 Dec, 2022, 5:47 pm Guowei Ma, <gu...@gmail.com> wrote:

> Hi,
> Thank you very much for driving this FLIP in order to improve user
> usability.
>
> I understand that a key goal of this FLIP is to adjust the memory
> requirements of shuffle to a more reasonable range. Through this adaptive
> range adjustment, the memory efficiency can be improved under the premise
> of ensuring the performance, thereby improving the user experience.
>
> I have no problem with this goal, but I have a concern about the means of
> implementation: should we introduce a _new_ non-orthogonal
> option(`taskmanager.memory.network.required-buffer-per-gate.max`). That is
> to say, the option will affect both streaming and batch shuffle behavior at
> the same time.
>
> From the description in FLIP, we can see that we do not want this value to
> be the same in streaming and batch scenarios. But we still let the user
> configure this parameter, and once this parameter is configured, the
> shuffle behavior of streaming and batch may be the same. In theory, there
> may be a configuration that can meet the requirements of batch shuffle, but
> it will affect the performance of streaming shuffle. (For example, we need
> to reduce the memory overhead in batch scenarios, but it will affect the
> performance of streaming shuffle). In other words, do we really want to add
> a new option that exposes this possible risk problem?
>
>   Personally, I think there might be two ways:
>     1. Modify the current implementation of streaming shuffle. Don't let
> the streaming shuffle performance regression. In this way, this option will
> not couple streaming shuffle and batch shuffle. This also avoids confusion
> for the user.  But I am not sure how to do it. :-)
>     2. Introduce a pure batch read option, similar to the one introduced on
> the batch write side.
>
> BTW: It's better not to expose more implementation-related concepts to
> users. For example, the "gate" is related to the internal implementation.
> Relatively speaking, `shuffle.read/shuffle.client.read` may be more
> general. After all, it can also avoid coupling with the topology structure
> and scheduling units.
>
> Best,
> Guowei
>
>
> On Fri, Dec 23, 2022 at 2:57 PM Lijie Wang <wa...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for driving this FLIP, +1 for the proposed changes.
> >
> > Limit the maximum value of shuffle read memory is very useful when using
> > when using adaptive batch scheduler. Currently, the adaptive batch
> > scheduler may cause a large number of input channels in a certain TM, so
> we
> > generally recommend that users configure
> > "taskmanager.network.memory.buffers-per-channel: 0" to decrease the the
> > possibility of “Insufficient number of network buffers” error. After this
> > FLIP, users no longer need to configure the
> > "taskmanager.network.memory.buffers-per-channel".
> >
> > So +1 from my side.
> >
> > Best,
> > Lijie
> >
> > Xintong Song <to...@gmail.com> 于2022年12月20日周二 10:04写道:
> >
> > > Thanks for the proposal, Yuxin.
> > >
> > > +1 for the proposed changes. I think these are indeed helpful usability
> > > improvements.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com>
> > wrote:
> > >
> > > > Hi, devs,
> > > >
> > > > I'd like to start a discussion about FLIP-266: Simplify network
> memory
> > > > configurations for TaskManager[1].
> > > >
> > > > When using Flink, users may encounter the following issues that
> affect
> > > > usability.
> > > > 1. The job may fail with an "Insufficient number of network buffers"
> > > > exception.
> > > > 2. Flink network memory size adjustment is complex.
> > > > When encountering these issues, users can solve some problems by
> adding
> > > or
> > > > adjusting parameters. However, multiple memory config options should
> be
> > > > changed. The config option adjustment requires understanding the
> > detailed
> > > > internal implementation, which is impractical for most users.
> > > >
> > > > To simplify network memory configurations for TaskManager and improve
> > > Flink
> > > > usability, this FLIP proposed some optimization solutions for the
> > issues.
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > > >
> > > > Best regards,
> > > > Yuxin
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Guowei Ma <gu...@gmail.com>.
Hi,
Thank you very much for driving this FLIP in order to improve user
usability.

I understand that a key goal of this FLIP is to adjust the memory
requirements of shuffle to a more reasonable range. Through this adaptive
range adjustment, the memory efficiency can be improved under the premise
of ensuring the performance, thereby improving the user experience.

I have no problem with this goal, but I have a concern about the means of
implementation: should we introduce a _new_ non-orthogonal
option(`taskmanager.memory.network.required-buffer-per-gate.max`). That is
to say, the option will affect both streaming and batch shuffle behavior at
the same time.

From the description in FLIP, we can see that we do not want this value to
be the same in streaming and batch scenarios. But we still let the user
configure this parameter, and once this parameter is configured, the
shuffle behavior of streaming and batch may be the same. In theory, there
may be a configuration that can meet the requirements of batch shuffle, but
it will affect the performance of streaming shuffle. (For example, we need
to reduce the memory overhead in batch scenarios, but it will affect the
performance of streaming shuffle). In other words, do we really want to add
a new option that exposes this possible risk problem?

  Personally, I think there might be two ways:
    1. Modify the current implementation of streaming shuffle. Don't let
the streaming shuffle performance regression. In this way, this option will
not couple streaming shuffle and batch shuffle. This also avoids confusion
for the user.  But I am not sure how to do it. :-)
    2. Introduce a pure batch read option, similar to the one introduced on
the batch write side.

BTW: It's better not to expose more implementation-related concepts to
users. For example, the "gate" is related to the internal implementation.
Relatively speaking, `shuffle.read/shuffle.client.read` may be more
general. After all, it can also avoid coupling with the topology structure
and scheduling units.

Best,
Guowei


On Fri, Dec 23, 2022 at 2:57 PM Lijie Wang <wa...@gmail.com> wrote:

> Hi,
>
> Thanks for driving this FLIP, +1 for the proposed changes.
>
> Limit the maximum value of shuffle read memory is very useful when using
> when using adaptive batch scheduler. Currently, the adaptive batch
> scheduler may cause a large number of input channels in a certain TM, so we
> generally recommend that users configure
> "taskmanager.network.memory.buffers-per-channel: 0" to decrease the the
> possibility of “Insufficient number of network buffers” error. After this
> FLIP, users no longer need to configure the
> "taskmanager.network.memory.buffers-per-channel".
>
> So +1 from my side.
>
> Best,
> Lijie
>
> Xintong Song <to...@gmail.com> 于2022年12月20日周二 10:04写道:
>
> > Thanks for the proposal, Yuxin.
> >
> > +1 for the proposed changes. I think these are indeed helpful usability
> > improvements.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com>
> wrote:
> >
> > > Hi, devs,
> > >
> > > I'd like to start a discussion about FLIP-266: Simplify network memory
> > > configurations for TaskManager[1].
> > >
> > > When using Flink, users may encounter the following issues that affect
> > > usability.
> > > 1. The job may fail with an "Insufficient number of network buffers"
> > > exception.
> > > 2. Flink network memory size adjustment is complex.
> > > When encountering these issues, users can solve some problems by adding
> > or
> > > adjusting parameters. However, multiple memory config options should be
> > > changed. The config option adjustment requires understanding the
> detailed
> > > internal implementation, which is impractical for most users.
> > >
> > > To simplify network memory configurations for TaskManager and improve
> > Flink
> > > usability, this FLIP proposed some optimization solutions for the
> issues.
> > >
> > > Looking forward to your feedback.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > >
> > > Best regards,
> > > Yuxin
> > >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Lijie Wang <wa...@gmail.com>.
Hi,

Thanks for driving this FLIP, +1 for the proposed changes.

Limit the maximum value of shuffle read memory is very useful when using
when using adaptive batch scheduler. Currently, the adaptive batch
scheduler may cause a large number of input channels in a certain TM, so we
generally recommend that users configure
"taskmanager.network.memory.buffers-per-channel: 0" to decrease the the
possibility of “Insufficient number of network buffers” error. After this
FLIP, users no longer need to configure the
"taskmanager.network.memory.buffers-per-channel".

So +1 from my side.

Best,
Lijie

Xintong Song <to...@gmail.com> 于2022年12月20日周二 10:04写道:

> Thanks for the proposal, Yuxin.
>
> +1 for the proposed changes. I think these are indeed helpful usability
> improvements.
>
> Best,
>
> Xintong
>
>
>
> On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com> wrote:
>
> > Hi, devs,
> >
> > I'd like to start a discussion about FLIP-266: Simplify network memory
> > configurations for TaskManager[1].
> >
> > When using Flink, users may encounter the following issues that affect
> > usability.
> > 1. The job may fail with an "Insufficient number of network buffers"
> > exception.
> > 2. Flink network memory size adjustment is complex.
> > When encountering these issues, users can solve some problems by adding
> or
> > adjusting parameters. However, multiple memory config options should be
> > changed. The config option adjustment requires understanding the detailed
> > internal implementation, which is impractical for most users.
> >
> > To simplify network memory configurations for TaskManager and improve
> Flink
> > usability, this FLIP proposed some optimization solutions for the issues.
> >
> > Looking forward to your feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> >
> > Best regards,
> > Yuxin
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Xintong Song <to...@gmail.com>.
Thanks for the proposal, Yuxin.

+1 for the proposed changes. I think these are indeed helpful usability
improvements.

Best,

Xintong



On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com> wrote:

> Hi, devs,
>
> I'd like to start a discussion about FLIP-266: Simplify network memory
> configurations for TaskManager[1].
>
> When using Flink, users may encounter the following issues that affect
> usability.
> 1. The job may fail with an "Insufficient number of network buffers"
> exception.
> 2. Flink network memory size adjustment is complex.
> When encountering these issues, users can solve some problems by adding or
> adjusting parameters. However, multiple memory config options should be
> changed. The config option adjustment requires understanding the detailed
> internal implementation, which is impractical for most users.
>
> To simplify network memory configurations for TaskManager and improve Flink
> usability, this FLIP proposed some optimization solutions for the issues.
>
> Looking forward to your feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>
> Best regards,
> Yuxin
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Hi, JasonLee

Thanks for the feedback.

> How can we determine how many RS and IG a DAG has?

Network memory is related to the parallelism and the complexity of the task
DAG, which I think is correct. However, this Flip can only improve the part
issue of the total issue, mainly focusing on memory optimization of Shuffle
reading. We can only limit the total read buffers in one InputGate, but can
not determine the number of RS and IG. The good news is that this is an
independent problem. Maybe we could try to optimize and solve that
problem later.


Best,
Yuxin


JasonLee <17...@163.com> 于2022年12月28日周三 19:18写道:

> Hi Yuxin
>
>
> Thanks for the proposal, big + 1 for this FLIP.
>
>
>
> It is difficult for users to calculate the size of network memory. If the
> setting is too small, the task cannot be started. If the setting is too
> large, there may be a waste of resources. As far as possible, Flink
> framework can automatically set a reasonable value, but I have a small
> problem. network memory is not only related to the parallelism of the task,
> but also to the complexity of the task DAG. The more complex a DAG is,
> shuffle write and shuffle read require larger buffers. How can we determine
> how many RS and IG a DAG has?
>
>
>
> Best
> JasonLee
>
>
> ---- Replied Message ----
> | From | Yuxin Tan<ta...@gmail.com> |
> | Date | 12/28/2022 18:29 |
> | To | <de...@flink.apache.org> |
> | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations
> for TaskManager |
> Hi, Roman
>
> Thanks for the replay.
>
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> configurations, which are not calculated. I have described them in the FLIP
> motivation section.
>
> 3. Each gate requires at least one buffer...
> The timeout exception occurs when the ExclusiveBuffersPerChannel
> can not be requested from NetworkBufferPool, which is not caused by the
> change of this Flip. In addition, if  we have set the
> ExclusiveBuffersPerChannel
> to 0 when using floating buffers, which can also decrease the probability
> of
> this exception.
>
> 4. It would be great to have experimental results for jobs with different
> exchange types.
> Thanks for the suggestion. I have a test about different exchange types,
> forward
> and rescale, and the results show no differences from the all-to-all type,
> which
> is also understandable, because the network memory usage is calculated
> with numChannels, independent of the edge type.
>
> Best,
> Yuxin
>
>
> Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
>
> Hi everyone,
>
> Thanks for the proposal and the discussion.
>
> I couldn't find much details on how exactly the values of
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> I guess that
> - the threshold evaluation is done on JM
> - floating buffers calculation is done on TM based on the current memory
> available; so it is not taking into account any future tasks submitted for
> that (or other) job
> Is that correct?
>
> If so, I see the following potential issues:
>
> 1. Each (sub)task might have different values because the actual
> available memory might be different. E.g. some tasks might use exclusive
> buffers and others only floating. That could lead to significant skew
> in processing speed, and in turn to issues with checkpoints and watermarks.
>
> 2. Re-deployment of a task (e.g. on job failure) might lead to a completely
> different memory configuration. That, coupled with different values per
> subtask and operator, makes the performance analysis more difficult.
>
> (Regardless of whether it's done on TM or JM):
> 3. Each gate requires at least one buffer [1]. So, in case when no memory
> is available, TM will throw an Allocation timeout exception instead of
> Insufficient buffers exception immediately. A delay here (allocation
> timeout) seems like a regression.
> Besides that, the regression depends on how much memory is actually
> available and how much it is contended, doesn't it?
> Should there still be a lower threshold of available memory, below which
> the job (task) isn't accepted?
> 4. The same threshold for all types of shuffles will likely result in using
> exclusive buffers
> for point-wise connections and floating buffers for all-to-all ones. I'm
> not sure if that's always optimal. It would be great to have experimental
> results for jobs with different exchange types, WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-24035
>
> Regards,
> Roman
>
>
> On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com> wrote:
>
> Hi, Weihua
>
> Thanks for your suggestions.
>
> 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> total buffer is not enough?
>
> I think it's a good idea. Will try and check the results in PoC. Before
> all
> read buffers use floating buffers, I will try to use
> (ExclusiveBuffersPerChannel - i)
> buffers per channel first. For example, if the user has configured
> ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> all channels is 1 and all read buffers are insufficient, all read buffers
> will use floating buffers.
> If the test results prove better, the FLIP will use this method.
>
> 2. Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Changing taskmanager.memory.network.max will indeed affect some
> users, but the user only is affected when the 3 conditions are fulfilled.
> 1) Flink total TM memory is larger than 10g (because the network memory
> ratio is 0.1).
> 2) taskmanager.memory.network.max was not initially configured.
> 3) Other memory, such as managed memory or heap memory, is insufficient.
> I think the number of jobs fulfilling the conditions is small because
> when
> TM
> uses such a large amount of memory, the network memory requirement may
> also be large. And when encountering the issue, the rollback method is
> very
> simple,
> configuring taskmanager.memory.network.max as 1g or other values.
> In addition, the reason for modifying the default value is to simplify
> the
> network
> configurations in most scenarios. This change does affect a few usage
> scenarios,
> but we should admit that setting the default to any value may not meet
> the requirements of all scenarios.
>
> Best,
> Yuxin
>
>
> Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
>
> Hi Yuxin,
> Thanks for the proposal.
>
> "Insufficient number of network buffers" exceptions also bother us.
> It's
> too hard for users to figure out
> how much network buffer they really need. It relates to partitioner
> type,
> parallelism, slots per taskmanager.
>
> Since streaming jobs are our primary scenario, I have some questions
> about
> streaming jobs.
>
> 1. In this FLIP, all read buffers will use floating buffers when the
> total
> buffer is more than
> 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> buffer allocation led to preference regression.
> How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> buffer is not enough?
> Will this reduce performance regression in streaming?
>
> 2. Changing taskmanager.memory.network.max will affect user migration
> from
> the lower version.
> IMO, network buffer size should not increase with total memory,
> especially
> for streaming jobs with application mode.
> For example, some ETL jobs with rescale partitioner only require a few
> network buffers.
> And we already have
> 'taskmanager.memory.network.read-required-buffer.max'
> to control maximum read network buffer usage.
> Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Best,
> Weihua
>
>
> On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
> wrote:
>
> Hi, all
> Thanks for the reply and feedback for everyone!
>
>
> After combining everyone's comments, the main concerns, and
> corresponding
> adjustments are as follows.
>
>
> @Guowei Ma, Thanks for your feedback.
> should we introduce a _new_ non-orthogonal
> option(`taskmanager.memory.network.required-buffer-per-gate.max`).
> That
> is
> to say, the option will affect both streaming and batch shuffle
> behavior
> at
> the
> same time.
>
> 1. Because the default option can meet most requirements no matter in
> Streaming
> or Batch scenarios. We do not want users to adjust this default
> config
> option by
> design. This configuration option is added only to preserve the
> possibility
> of
> modification options for users.
> 2. In a few cases, if you really want to adjust this option, users
> may
> not
> expect to
> adjust the option according to Streaming or Batch, for example,
> according
> to the
> parallelism of the job.
> 3. Regarding the performance of streaming shuffle, the same problem
> of
> insufficient memory also exists for Streaming jobs. We introduced
> this
> configuration
> to enable users to decouple memory and parallelism, but it will
> affect
> some
> performance. By default, the feature is disabled and does not affect
> performance.
> However, the added configuration enables users to choose to decouple
> memory
> usage and parallelism for Streaming jobs.
>
> It's better not to expose more implementation-related concepts to
> users.
>
> Thanks for you suggestion. I will modify the option name to avoid
> exposing
> implementation-related concepts. I have changed it to
> `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
>
>
>
> @Dong Lin, Thanks for your reply.
> it might be helpful to add a dedicated public interface section to
> describe
> the config key and config semantics.
>
> Thanks for your suggestion. I have added public interface section to
> describe
> the config key and config semantics clearly.
>
> This FLIP seems to add more configs without removing any config
> from
> Flink.
>
> This Flip is to reduce the number of options to be adjusted when
> using
> Flink.
> After the Flip, the default option can meet the requirements in most
> sceneries
> rather than modifying any config
> options(`taskmanager.network.memory.buffers-per-channel`
> and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> helpful
> to improve the out-of-box usability. In the long run, these two
> parameters
> `taskmanager.network.memory.buffers-per-channel` and
> `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> deprecated
> to reduce user parameters, but from the perspective of compatibility,
> we
> need to
> pay attention to users' feedback before deciding to deprecate the
> options.
>
>
>
> @Yanfei Lei,Thanks for your feedback.
> 1. Through the option is cluster level, the default value is
> different
> according to the
> job type. In other words, by default, for Batch jobs, the config
> value
> is
> enabled, 1000.
> And for Streaming jobs, the config value is not enabled by default.
>
> 2. I think this is a good point. The total floating buffers will not
> change
> with
>
>
>
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> because this is the maximum memory threshold. But if the user
> explicitly
> specified
> the ExclusiveBuffersPerChannel, the calculated result of
> ExclusiveBuffersPerChannel * numChannels will change with it.
>
>
> Thanks again for all feedback!
>
>
> Best,
> Yuxin
>
>
> Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
>
> Hi Yuxin,
>
> Thanks for creating this FLIP.
>
> It's good if Flink does not require users to set a very large
> network
> memory, or tune the advanced(hard-to-understand)
> per-channel/per-gate
> buffer configs, to avoid "Insufficient number of network buffers"
> exceptions
> which can easily happen for large scale jobs.
>
> Regarding the new config
> "taskmanager.memory.network.read-required-buffer.max",
> I think it's still an advanced config which users may feel hard to
> tune.
> However, given that in most cases users will not need to set it, I
> think it's acceptable.
>
> So +1 for this FLIP.
>
> In the future, I think Flink should adaptively select to use
> exclusive
> buffers
> or not according to whether there are sufficient network buffers at
> runtime.
> Users then no longer need to understand the above configuration.
> This
> may
> require supporting transitions between exclusive buffers and
> floating
> buffers.
> A problem of all buffer floating is that too few network buffers
> can
> result
> in task slowness which is hard to identify by users. So it's also
> needed
> to
> do improvements on metrics and web UI to expose such issues.
>
> Thanks,
> Zhu
>
> Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
>
> Hi Yuxin,
>
> Thanks for the proposal!
>
> After reading the FLIP, I have some questions about the default
> value.
> This FLIP seems to introduce a *new* config
> option(taskmanager.memory.network.required-buffer-per-gate.max)
> to
> control
> the network memory usage.
> 1. Is this configuration at the job level or cluster level? As
> the
> FLIP
> described, the default values of the Batch job and Stream job are
> different, If an explicit value is set for cluster level, will it
> affect
> all Batch jobs and Stream jobs on the cluster?
>
> 2. The default value of Batch Job depends on the value of
>
>
>
>
>
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> if the value of ExclusiveBuffersPerChannel changed, does
> "taskmanager.memory.network.required-buffer-per-gate.max" need to
> change
> with it?
>
>
> Best,
> Yanfei
>
> Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
>
> Hi Yuxin,
>
> Thanks for proposing the FLIP!
>
> The motivation section makes sense. But it seems that the
> proposed
> change
> section mixes the proposed config with the evaluation results.
> It
> is
> a
> bit
> hard to understand what configs are proposed and how to
> describe
> these
> configs to users. Given that the configuration setting is part
> of
> public
> interfaces, it might be helpful to add a dedicated public
> interface
> section
> to describe the config key and config semantics, as suggested
> in
> the
> FLIP
> template here
> <
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> .
>
> This FLIP seems to add more configs without removing any config
> from
> Flink.
> Intuitively this can make the Flink configuration harder rather
> than
> simpler. Maybe we can get a better idea after we add a public
> interface
> section to clarify those configs.
>
> Thanks,
> Dong
>
>
> On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> tanyuxinwork@gmail.com>
> wrote:
>
> Hi, devs,
>
> I'd like to start a discussion about FLIP-266: Simplify
> network
> memory
> configurations for TaskManager[1].
>
> When using Flink, users may encounter the following issues
> that
> affect
> usability.
> 1. The job may fail with an "Insufficient number of network
> buffers"
> exception.
> 2. Flink network memory size adjustment is complex.
> When encountering these issues, users can solve some problems
> by
> adding
> or
> adjusting parameters. However, multiple memory config options
> should
> be
> changed. The config option adjustment requires understanding
> the
> detailed
> internal implementation, which is impractical for most users.
>
> To simplify network memory configurations for TaskManager and
> improve
> Flink
> usability, this FLIP proposed some optimization solutions for
> the
> issues.
>
> Looking forward to your feedback.
>
> [1]
>
>
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>
> Best regards,
> Yuxin
>
>
>
>
>
>
>
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Thanks for your question, Roman

> wouldn't it be more clear to separate motivation from the
proposed changes?
All the contents in the motivation part are the current implementation
of Shuffle reading.
The Flip has described where ExclusiveBuffersPerChannel
and FloatingBuffersPerGate is from, the description is as follows.
    "ExclusiveBuffersPerChannel is determined by
    taskmanager.network.memory.buffers-per-channel, which is 2 by default.
    FloatingBuffersPerGate ranges from 1 to DefaultFloatingBuffersPerGate.
    DefaultFloatingBuffersPerGate is determined by
    taskmanager.network.memory.floating-buffers-per-gate, which is 8 by
default."
This part is the current implementation of network memory, not the change
introduced
from the Flip.
ExclusiveBuffersPerChannel is used in
SingleInputGateFactory#createInputChannel,
so it is the number of exclusive buffers in one channel. In one gate, the
floating
buffers FloatingBuffersPerGate are used in SingleInputGateFactory#create to
create the LocalBufferPool of the gate. The buffer number of
LocalBufferPool ranges
from 1 to the DefaultFloatingBuffersPerGate, which can be seen in
SingleInputGateFactory#createBufferPoolFactory, the
NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate controls the buffer
number of LocalBufferPool ranges from 1 to DefaultFloatingBuffersPerGate.
One gate
may contain multiple channels, we call the number of channels as
numChannels, So
the total buffers are
*ExclusiveBuffersPerChannel * numChannels + FloatingBuffersPerGate,*where
FloatingBuffersPerGate in LocalBufferPool is a range from 1 to
DefaultFloatingBuffersPerGate, so
    "the range of TotalBuffersPerGate for each InputGate is
[ExclusiveBuffersPerChannel * numChannels
     + 1, ExclusiveBuffersPerChannel * numChannels +
DefaultFloatingBuffersPerGate]",
which is described in the Flip. All the above descriptions are current
implementations, not
changes from Flip. Therefore, the motivation section describes the current
implementation,
while the proposed change section describes the changes that have occurred,
which
separates motivation from the proposed change.

> So my question is what value exactly in this range will it have and how
and
where will it be computed?
Based on the current implementation, a threshold is introduced,
taskmanager.memory.network.read-required-buffer.max. As described above,
when
the the num of exclusive buffers in a gate (ExclusiveBuffersPerChannel *
numChannels)
is greater than the threshold, all read buffers will use floating buffers,
and in order to
keep the number of buffers consistent with that before the change, we will
modify the
creating process of LocalBufferPool.
This is an implementation detail and the change is as follows. The floating
buffers
in LocalBufferPool will be in the range of [numFloatingBufferThreashold,
ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate],
which means when calling SingleInputGateFactory#createBufferPoolFactory,
the Min buffers of the LocalBufferPool are numFloatingBufferThreashold, the
Max
buffers of the LocalBufferPool are ExclusiveBuffersPerChannel * numChannels
+
DefaultFloatingBuffersPerGate. The exact buffer num depends on whether
buffers
in NetworkBufferPool are sufficient when using LocalBufferPool, which can
be seen
in the current implementation of LocalBufferPool. And the exact number of
floating
buffers in LocalBufferPool is in the above range.

Best,
Yuxin


Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 20:10写道:

> Thanks for your reply Yuxin,
>
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
>
> The motivation section says about floating buffers:
> > FloatingBuffersPerGate is within the range of
> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
> DefaultFloatingBuffersPerGate] ...
> So my question is what value exactly in this range will it have and how and
> where will it be computed?
>
> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
> to calculate it dynamically (by linear search
> from taskmanager.network.memory.buffers-per-channel down to 0).
>
> Also, if the two configuration options are still in use, why does the FLIP
> propose to deprecate them?
>
> Besides that, wouldn't it be more clear to separate motivation from the
> proposed changes?
>
> Regards,
> Roman
>
>
> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17...@163.com> wrote:
>
> > Hi Yuxin
> >
> >
> > Thanks for the proposal, big + 1 for this FLIP.
> >
> >
> >
> > It is difficult for users to calculate the size of network memory. If the
> > setting is too small, the task cannot be started. If the setting is too
> > large, there may be a waste of resources. As far as possible, Flink
> > framework can automatically set a reasonable value, but I have a small
> > problem. network memory is not only related to the parallelism of the
> task,
> > but also to the complexity of the task DAG. The more complex a DAG is,
> > shuffle write and shuffle read require larger buffers. How can we
> determine
> > how many RS and IG a DAG has?
> >
> >
> >
> > Best
> > JasonLee
> >
> >
> > ---- Replied Message ----
> > | From | Yuxin Tan<ta...@gmail.com> |
> > | Date | 12/28/2022 18:29 |
> > | To | <de...@flink.apache.org> |
> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
> configurations
> > for TaskManager |
> > Hi, Roman
> >
> > Thanks for the replay.
> >
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
> >
> > 3. Each gate requires at least one buffer...
> > The timeout exception occurs when the ExclusiveBuffersPerChannel
> > can not be requested from NetworkBufferPool, which is not caused by the
> > change of this Flip. In addition, if  we have set the
> > ExclusiveBuffersPerChannel
> > to 0 when using floating buffers, which can also decrease the probability
> > of
> > this exception.
> >
> > 4. It would be great to have experimental results for jobs with different
> > exchange types.
> > Thanks for the suggestion. I have a test about different exchange types,
> > forward
> > and rescale, and the results show no differences from the all-to-all
> type,
> > which
> > is also understandable, because the network memory usage is calculated
> > with numChannels, independent of the edge type.
> >
> > Best,
> > Yuxin
> >
> >
> > Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
> >
> > Hi everyone,
> >
> > Thanks for the proposal and the discussion.
> >
> > I couldn't find much details on how exactly the values of
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> > I guess that
> > - the threshold evaluation is done on JM
> > - floating buffers calculation is done on TM based on the current memory
> > available; so it is not taking into account any future tasks submitted
> for
> > that (or other) job
> > Is that correct?
> >
> > If so, I see the following potential issues:
> >
> > 1. Each (sub)task might have different values because the actual
> > available memory might be different. E.g. some tasks might use exclusive
> > buffers and others only floating. That could lead to significant skew
> > in processing speed, and in turn to issues with checkpoints and
> watermarks.
> >
> > 2. Re-deployment of a task (e.g. on job failure) might lead to a
> completely
> > different memory configuration. That, coupled with different values per
> > subtask and operator, makes the performance analysis more difficult.
> >
> > (Regardless of whether it's done on TM or JM):
> > 3. Each gate requires at least one buffer [1]. So, in case when no memory
> > is available, TM will throw an Allocation timeout exception instead of
> > Insufficient buffers exception immediately. A delay here (allocation
> > timeout) seems like a regression.
> > Besides that, the regression depends on how much memory is actually
> > available and how much it is contended, doesn't it?
> > Should there still be a lower threshold of available memory, below which
> > the job (task) isn't accepted?
> > 4. The same threshold for all types of shuffles will likely result in
> using
> > exclusive buffers
> > for point-wise connections and floating buffers for all-to-all ones. I'm
> > not sure if that's always optimal. It would be great to have experimental
> > results for jobs with different exchange types, WDYT?
> >
> > [1]
> > https://issues.apache.org/jira/browse/FLINK-24035
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com>
> wrote:
> >
> > Hi, Weihua
> >
> > Thanks for your suggestions.
> >
> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> > total buffer is not enough?
> >
> > I think it's a good idea. Will try and check the results in PoC. Before
> > all
> > read buffers use floating buffers, I will try to use
> > (ExclusiveBuffersPerChannel - i)
> > buffers per channel first. For example, if the user has configured
> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> > all channels is 1 and all read buffers are insufficient, all read buffers
> > will use floating buffers.
> > If the test results prove better, the FLIP will use this method.
> >
> > 2. Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Changing taskmanager.memory.network.max will indeed affect some
> > users, but the user only is affected when the 3 conditions are fulfilled.
> > 1) Flink total TM memory is larger than 10g (because the network memory
> > ratio is 0.1).
> > 2) taskmanager.memory.network.max was not initially configured.
> > 3) Other memory, such as managed memory or heap memory, is insufficient.
> > I think the number of jobs fulfilling the conditions is small because
> > when
> > TM
> > uses such a large amount of memory, the network memory requirement may
> > also be large. And when encountering the issue, the rollback method is
> > very
> > simple,
> > configuring taskmanager.memory.network.max as 1g or other values.
> > In addition, the reason for modifying the default value is to simplify
> > the
> > network
> > configurations in most scenarios. This change does affect a few usage
> > scenarios,
> > but we should admit that setting the default to any value may not meet
> > the requirements of all scenarios.
> >
> > Best,
> > Yuxin
> >
> >
> > Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
> >
> > Hi Yuxin,
> > Thanks for the proposal.
> >
> > "Insufficient number of network buffers" exceptions also bother us.
> > It's
> > too hard for users to figure out
> > how much network buffer they really need. It relates to partitioner
> > type,
> > parallelism, slots per taskmanager.
> >
> > Since streaming jobs are our primary scenario, I have some questions
> > about
> > streaming jobs.
> >
> > 1. In this FLIP, all read buffers will use floating buffers when the
> > total
> > buffer is more than
> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> > buffer allocation led to preference regression.
> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> > buffer is not enough?
> > Will this reduce performance regression in streaming?
> >
> > 2. Changing taskmanager.memory.network.max will affect user migration
> > from
> > the lower version.
> > IMO, network buffer size should not increase with total memory,
> > especially
> > for streaming jobs with application mode.
> > For example, some ETL jobs with rescale partitioner only require a few
> > network buffers.
> > And we already have
> > 'taskmanager.memory.network.read-required-buffer.max'
> > to control maximum read network buffer usage.
> > Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
> > wrote:
> >
> > Hi, all
> > Thanks for the reply and feedback for everyone!
> >
> >
> > After combining everyone's comments, the main concerns, and
> > corresponding
> > adjustments are as follows.
> >
> >
> > @Guowei Ma, Thanks for your feedback.
> > should we introduce a _new_ non-orthogonal
> > option(`taskmanager.memory.network.required-buffer-per-gate.max`).
> > That
> > is
> > to say, the option will affect both streaming and batch shuffle
> > behavior
> > at
> > the
> > same time.
> >
> > 1. Because the default option can meet most requirements no matter in
> > Streaming
> > or Batch scenarios. We do not want users to adjust this default
> > config
> > option by
> > design. This configuration option is added only to preserve the
> > possibility
> > of
> > modification options for users.
> > 2. In a few cases, if you really want to adjust this option, users
> > may
> > not
> > expect to
> > adjust the option according to Streaming or Batch, for example,
> > according
> > to the
> > parallelism of the job.
> > 3. Regarding the performance of streaming shuffle, the same problem
> > of
> > insufficient memory also exists for Streaming jobs. We introduced
> > this
> > configuration
> > to enable users to decouple memory and parallelism, but it will
> > affect
> > some
> > performance. By default, the feature is disabled and does not affect
> > performance.
> > However, the added configuration enables users to choose to decouple
> > memory
> > usage and parallelism for Streaming jobs.
> >
> > It's better not to expose more implementation-related concepts to
> > users.
> >
> > Thanks for you suggestion. I will modify the option name to avoid
> > exposing
> > implementation-related concepts. I have changed it to
> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
> >
> >
> >
> > @Dong Lin, Thanks for your reply.
> > it might be helpful to add a dedicated public interface section to
> > describe
> > the config key and config semantics.
> >
> > Thanks for your suggestion. I have added public interface section to
> > describe
> > the config key and config semantics clearly.
> >
> > This FLIP seems to add more configs without removing any config
> > from
> > Flink.
> >
> > This Flip is to reduce the number of options to be adjusted when
> > using
> > Flink.
> > After the Flip, the default option can meet the requirements in most
> > sceneries
> > rather than modifying any config
> > options(`taskmanager.network.memory.buffers-per-channel`
> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> > helpful
> > to improve the out-of-box usability. In the long run, these two
> > parameters
> > `taskmanager.network.memory.buffers-per-channel` and
> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> > deprecated
> > to reduce user parameters, but from the perspective of compatibility,
> > we
> > need to
> > pay attention to users' feedback before deciding to deprecate the
> > options.
> >
> >
> >
> > @Yanfei Lei,Thanks for your feedback.
> > 1. Through the option is cluster level, the default value is
> > different
> > according to the
> > job type. In other words, by default, for Batch jobs, the config
> > value
> > is
> > enabled, 1000.
> > And for Streaming jobs, the config value is not enabled by default.
> >
> > 2. I think this is a good point. The total floating buffers will not
> > change
> > with
> >
> >
> >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> > because this is the maximum memory threshold. But if the user
> > explicitly
> > specified
> > the ExclusiveBuffersPerChannel, the calculated result of
> > ExclusiveBuffersPerChannel * numChannels will change with it.
> >
> >
> > Thanks again for all feedback!
> >
> >
> > Best,
> > Yuxin
> >
> >
> > Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
> >
> > Hi Yuxin,
> >
> > Thanks for creating this FLIP.
> >
> > It's good if Flink does not require users to set a very large
> > network
> > memory, or tune the advanced(hard-to-understand)
> > per-channel/per-gate
> > buffer configs, to avoid "Insufficient number of network buffers"
> > exceptions
> > which can easily happen for large scale jobs.
> >
> > Regarding the new config
> > "taskmanager.memory.network.read-required-buffer.max",
> > I think it's still an advanced config which users may feel hard to
> > tune.
> > However, given that in most cases users will not need to set it, I
> > think it's acceptable.
> >
> > So +1 for this FLIP.
> >
> > In the future, I think Flink should adaptively select to use
> > exclusive
> > buffers
> > or not according to whether there are sufficient network buffers at
> > runtime.
> > Users then no longer need to understand the above configuration.
> > This
> > may
> > require supporting transitions between exclusive buffers and
> > floating
> > buffers.
> > A problem of all buffer floating is that too few network buffers
> > can
> > result
> > in task slowness which is hard to identify by users. So it's also
> > needed
> > to
> > do improvements on metrics and web UI to expose such issues.
> >
> > Thanks,
> > Zhu
> >
> > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
> >
> > Hi Yuxin,
> >
> > Thanks for the proposal!
> >
> > After reading the FLIP, I have some questions about the default
> > value.
> > This FLIP seems to introduce a *new* config
> > option(taskmanager.memory.network.required-buffer-per-gate.max)
> > to
> > control
> > the network memory usage.
> > 1. Is this configuration at the job level or cluster level? As
> > the
> > FLIP
> > described, the default values of the Batch job and Stream job are
> > different, If an explicit value is set for cluster level, will it
> > affect
> > all Batch jobs and Stream jobs on the cluster?
> >
> > 2. The default value of Batch Job depends on the value of
> >
> >
> >
> >
> >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > if the value of ExclusiveBuffersPerChannel changed, does
> > "taskmanager.memory.network.required-buffer-per-gate.max" need to
> > change
> > with it?
> >
> >
> > Best,
> > Yanfei
> >
> > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
> >
> > Hi Yuxin,
> >
> > Thanks for proposing the FLIP!
> >
> > The motivation section makes sense. But it seems that the
> > proposed
> > change
> > section mixes the proposed config with the evaluation results.
> > It
> > is
> > a
> > bit
> > hard to understand what configs are proposed and how to
> > describe
> > these
> > configs to users. Given that the configuration setting is part
> > of
> > public
> > interfaces, it might be helpful to add a dedicated public
> > interface
> > section
> > to describe the config key and config semantics, as suggested
> > in
> > the
> > FLIP
> > template here
> > <
> >
> >
> >
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> > .
> >
> > This FLIP seems to add more configs without removing any config
> > from
> > Flink.
> > Intuitively this can make the Flink configuration harder rather
> > than
> > simpler. Maybe we can get a better idea after we add a public
> > interface
> > section to clarify those configs.
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> > tanyuxinwork@gmail.com>
> > wrote:
> >
> > Hi, devs,
> >
> > I'd like to start a discussion about FLIP-266: Simplify
> > network
> > memory
> > configurations for TaskManager[1].
> >
> > When using Flink, users may encounter the following issues
> > that
> > affect
> > usability.
> > 1. The job may fail with an "Insufficient number of network
> > buffers"
> > exception.
> > 2. Flink network memory size adjustment is complex.
> > When encountering these issues, users can solve some problems
> > by
> > adding
> > or
> > adjusting parameters. However, multiple memory config options
> > should
> > be
> > changed. The config option adjustment requires understanding
> > the
> > detailed
> > internal implementation, which is impractical for most users.
> >
> > To simplify network memory configurations for TaskManager and
> > improve
> > Flink
> > usability, this FLIP proposed some optimization solutions for
> > the
> > issues.
> >
> > Looking forward to your feedback.
> >
> > [1]
> >
> >
> >
> >
> >
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> >
> > Best regards,
> > Yuxin
> >
> >
> >
> >
> >
> >
> >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Hi all,

Thanks for all the feedback so far.

The discussion has been going on for some time. If there are no
more new comments, we will start a vote today.

Best,
Yuxin


Yuxin Tan <ta...@gmail.com> 于2022年12月29日周四 17:37写道:

> Hi, everyone
>
> Thanks for the reply and the discussion.
>
> We discussed this with @Guowei Ma, @Dong Lin, and @Yanfei Lei
> offline, and reached a consensus on this FLIP. Based on the offline
> discussions and suggestions from @Weihua Hu, the following changes
> have been updated in the FLIP.
>
> 1. Changes in public interfaces.
> - Updated the descriptions of the newly added config to describe the
> option more clearly.
> - The new config will be marked as experimental in the first release,
> and we will revisit this in the next release based on the user feedback.
> - In the long run, with the new config, we think the original two configs
> can be deprecated. At this stage, since the new config is still
> experimental,
> we will not immediately deprecate them.
> - Modify the config key name as
> taskmanager.memory.network.read-buffer.required-per-gate.max for
> more clarity.
> 2. Modify the floating buffer calculation method.
> - When the memory used reaches the threshold, the number of exclusive
> buffers is gradually reduced in a fine-grained manner, rather than
> directly
> reducing the number of exclusive buffers to 0.
>
> Best,
> Yuxin
>
>
> Yuxin Tan <ta...@gmail.com> 于2022年12月29日周四 14:48写道:
>
>> Hi, Roman
>>
>> Sorry about that I missed one question just now.
>>
>> >  if the two configuration options are still in use, why does the FLIP
>> propose to deprecate them?
>> These two configs are usually used to avoid the memory issue, but
>> after introducing the improvement, generally, I think it is no longer
>> necessary to adjust these two configurations to avoid the issue. So
>> I propose to deprecate them in the future when the @Experimental
>> annotation of the newly added config is removed.
>>
>> Best,
>> Yuxin
>>
>>
>> Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 20:10写道:
>>
>>> Thanks for your reply Yuxin,
>>>
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>>> > configurations, which are not calculated. I have described them in the
>>> FLIP
>>> > motivation section.
>>>
>>> The motivation section says about floating buffers:
>>> > FloatingBuffersPerGate is within the range of
>>> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
>>> DefaultFloatingBuffersPerGate] ...
>>> So my question is what value exactly in this range will it have and how
>>> and
>>> where will it be computed?
>>>
>>> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
>>> to calculate it dynamically (by linear search
>>> from taskmanager.network.memory.buffers-per-channel down to 0).
>>>
>>> Also, if the two configuration options are still in use, why does the
>>> FLIP
>>> propose to deprecate them?
>>>
>>> Besides that, wouldn't it be more clear to separate motivation from the
>>> proposed changes?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17...@163.com> wrote:
>>>
>>> > Hi Yuxin
>>> >
>>> >
>>> > Thanks for the proposal, big + 1 for this FLIP.
>>> >
>>> >
>>> >
>>> > It is difficult for users to calculate the size of network memory. If
>>> the
>>> > setting is too small, the task cannot be started. If the setting is too
>>> > large, there may be a waste of resources. As far as possible, Flink
>>> > framework can automatically set a reasonable value, but I have a small
>>> > problem. network memory is not only related to the parallelism of the
>>> task,
>>> > but also to the complexity of the task DAG. The more complex a DAG is,
>>> > shuffle write and shuffle read require larger buffers. How can we
>>> determine
>>> > how many RS and IG a DAG has?
>>> >
>>> >
>>> >
>>> > Best
>>> > JasonLee
>>> >
>>> >
>>> > ---- Replied Message ----
>>> > | From | Yuxin Tan<ta...@gmail.com> |
>>> > | Date | 12/28/2022 18:29 |
>>> > | To | <de...@flink.apache.org> |
>>> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
>>> configurations
>>> > for TaskManager |
>>> > Hi, Roman
>>> >
>>> > Thanks for the replay.
>>> >
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>>> > configurations, which are not calculated. I have described them in the
>>> FLIP
>>> > motivation section.
>>> >
>>> > 3. Each gate requires at least one buffer...
>>> > The timeout exception occurs when the ExclusiveBuffersPerChannel
>>> > can not be requested from NetworkBufferPool, which is not caused by the
>>> > change of this Flip. In addition, if  we have set the
>>> > ExclusiveBuffersPerChannel
>>> > to 0 when using floating buffers, which can also decrease the
>>> probability
>>> > of
>>> > this exception.
>>> >
>>> > 4. It would be great to have experimental results for jobs with
>>> different
>>> > exchange types.
>>> > Thanks for the suggestion. I have a test about different exchange
>>> types,
>>> > forward
>>> > and rescale, and the results show no differences from the all-to-all
>>> type,
>>> > which
>>> > is also understandable, because the network memory usage is calculated
>>> > with numChannels, independent of the edge type.
>>> >
>>> > Best,
>>> > Yuxin
>>> >
>>> >
>>> > Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
>>> >
>>> > Hi everyone,
>>> >
>>> > Thanks for the proposal and the discussion.
>>> >
>>> > I couldn't find much details on how exactly the values of
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
>>> > I guess that
>>> > - the threshold evaluation is done on JM
>>> > - floating buffers calculation is done on TM based on the current
>>> memory
>>> > available; so it is not taking into account any future tasks submitted
>>> for
>>> > that (or other) job
>>> > Is that correct?
>>> >
>>> > If so, I see the following potential issues:
>>> >
>>> > 1. Each (sub)task might have different values because the actual
>>> > available memory might be different. E.g. some tasks might use
>>> exclusive
>>> > buffers and others only floating. That could lead to significant skew
>>> > in processing speed, and in turn to issues with checkpoints and
>>> watermarks.
>>> >
>>> > 2. Re-deployment of a task (e.g. on job failure) might lead to a
>>> completely
>>> > different memory configuration. That, coupled with different values per
>>> > subtask and operator, makes the performance analysis more difficult.
>>> >
>>> > (Regardless of whether it's done on TM or JM):
>>> > 3. Each gate requires at least one buffer [1]. So, in case when no
>>> memory
>>> > is available, TM will throw an Allocation timeout exception instead of
>>> > Insufficient buffers exception immediately. A delay here (allocation
>>> > timeout) seems like a regression.
>>> > Besides that, the regression depends on how much memory is actually
>>> > available and how much it is contended, doesn't it?
>>> > Should there still be a lower threshold of available memory, below
>>> which
>>> > the job (task) isn't accepted?
>>> > 4. The same threshold for all types of shuffles will likely result in
>>> using
>>> > exclusive buffers
>>> > for point-wise connections and floating buffers for all-to-all ones.
>>> I'm
>>> > not sure if that's always optimal. It would be great to have
>>> experimental
>>> > results for jobs with different exchange types, WDYT?
>>> >
>>> > [1]
>>> > https://issues.apache.org/jira/browse/FLINK-24035
>>> >
>>> > Regards,
>>> > Roman
>>> >
>>> >
>>> > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com>
>>> wrote:
>>> >
>>> > Hi, Weihua
>>> >
>>> > Thanks for your suggestions.
>>> >
>>> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
>>> > total buffer is not enough?
>>> >
>>> > I think it's a good idea. Will try and check the results in PoC. Before
>>> > all
>>> > read buffers use floating buffers, I will try to use
>>> > (ExclusiveBuffersPerChannel - i)
>>> > buffers per channel first. For example, if the user has configured
>>> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
>>> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
>>> > all channels is 1 and all read buffers are insufficient, all read
>>> buffers
>>> > will use floating buffers.
>>> > If the test results prove better, the FLIP will use this method.
>>> >
>>> > 2. Do we really need to change the default value of
>>> > 'taskmanager.memory.network.max'?
>>> >
>>> > Changing taskmanager.memory.network.max will indeed affect some
>>> > users, but the user only is affected when the 3 conditions are
>>> fulfilled.
>>> > 1) Flink total TM memory is larger than 10g (because the network memory
>>> > ratio is 0.1).
>>> > 2) taskmanager.memory.network.max was not initially configured.
>>> > 3) Other memory, such as managed memory or heap memory, is
>>> insufficient.
>>> > I think the number of jobs fulfilling the conditions is small because
>>> > when
>>> > TM
>>> > uses such a large amount of memory, the network memory requirement may
>>> > also be large. And when encountering the issue, the rollback method is
>>> > very
>>> > simple,
>>> > configuring taskmanager.memory.network.max as 1g or other values.
>>> > In addition, the reason for modifying the default value is to simplify
>>> > the
>>> > network
>>> > configurations in most scenarios. This change does affect a few usage
>>> > scenarios,
>>> > but we should admit that setting the default to any value may not meet
>>> > the requirements of all scenarios.
>>> >
>>> > Best,
>>> > Yuxin
>>> >
>>> >
>>> > Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
>>> >
>>> > Hi Yuxin,
>>> > Thanks for the proposal.
>>> >
>>> > "Insufficient number of network buffers" exceptions also bother us.
>>> > It's
>>> > too hard for users to figure out
>>> > how much network buffer they really need. It relates to partitioner
>>> > type,
>>> > parallelism, slots per taskmanager.
>>> >
>>> > Since streaming jobs are our primary scenario, I have some questions
>>> > about
>>> > streaming jobs.
>>> >
>>> > 1. In this FLIP, all read buffers will use floating buffers when the
>>> > total
>>> > buffer is more than
>>> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
>>> > buffer allocation led to preference regression.
>>> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
>>> > buffer is not enough?
>>> > Will this reduce performance regression in streaming?
>>> >
>>> > 2. Changing taskmanager.memory.network.max will affect user migration
>>> > from
>>> > the lower version.
>>> > IMO, network buffer size should not increase with total memory,
>>> > especially
>>> > for streaming jobs with application mode.
>>> > For example, some ETL jobs with rescale partitioner only require a few
>>> > network buffers.
>>> > And we already have
>>> > 'taskmanager.memory.network.read-required-buffer.max'
>>> > to control maximum read network buffer usage.
>>> > Do we really need to change the default value of
>>> > 'taskmanager.memory.network.max'?
>>> >
>>> > Best,
>>> > Weihua
>>> >
>>> >
>>> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
>>> > wrote:
>>> >
>>> > Hi, all
>>> > Thanks for the reply and feedback for everyone!
>>> >
>>> >
>>> > After combining everyone's comments, the main concerns, and
>>> > corresponding
>>> > adjustments are as follows.
>>> >
>>> >
>>> > @Guowei Ma, Thanks for your feedback.
>>> > should we introduce a _new_ non-orthogonal
>>> > option(`taskmanager.memory.network.required-buffer-per-gate.max`).
>>> > That
>>> > is
>>> > to say, the option will affect both streaming and batch shuffle
>>> > behavior
>>> > at
>>> > the
>>> > same time.
>>> >
>>> > 1. Because the default option can meet most requirements no matter in
>>> > Streaming
>>> > or Batch scenarios. We do not want users to adjust this default
>>> > config
>>> > option by
>>> > design. This configuration option is added only to preserve the
>>> > possibility
>>> > of
>>> > modification options for users.
>>> > 2. In a few cases, if you really want to adjust this option, users
>>> > may
>>> > not
>>> > expect to
>>> > adjust the option according to Streaming or Batch, for example,
>>> > according
>>> > to the
>>> > parallelism of the job.
>>> > 3. Regarding the performance of streaming shuffle, the same problem
>>> > of
>>> > insufficient memory also exists for Streaming jobs. We introduced
>>> > this
>>> > configuration
>>> > to enable users to decouple memory and parallelism, but it will
>>> > affect
>>> > some
>>> > performance. By default, the feature is disabled and does not affect
>>> > performance.
>>> > However, the added configuration enables users to choose to decouple
>>> > memory
>>> > usage and parallelism for Streaming jobs.
>>> >
>>> > It's better not to expose more implementation-related concepts to
>>> > users.
>>> >
>>> > Thanks for you suggestion. I will modify the option name to avoid
>>> > exposing
>>> > implementation-related concepts. I have changed it to
>>> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
>>> >
>>> >
>>> >
>>> > @Dong Lin, Thanks for your reply.
>>> > it might be helpful to add a dedicated public interface section to
>>> > describe
>>> > the config key and config semantics.
>>> >
>>> > Thanks for your suggestion. I have added public interface section to
>>> > describe
>>> > the config key and config semantics clearly.
>>> >
>>> > This FLIP seems to add more configs without removing any config
>>> > from
>>> > Flink.
>>> >
>>> > This Flip is to reduce the number of options to be adjusted when
>>> > using
>>> > Flink.
>>> > After the Flip, the default option can meet the requirements in most
>>> > sceneries
>>> > rather than modifying any config
>>> > options(`taskmanager.network.memory.buffers-per-channel`
>>> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
>>> > helpful
>>> > to improve the out-of-box usability. In the long run, these two
>>> > parameters
>>> > `taskmanager.network.memory.buffers-per-channel` and
>>> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
>>> > deprecated
>>> > to reduce user parameters, but from the perspective of compatibility,
>>> > we
>>> > need to
>>> > pay attention to users' feedback before deciding to deprecate the
>>> > options.
>>> >
>>> >
>>> >
>>> > @Yanfei Lei,Thanks for your feedback.
>>> > 1. Through the option is cluster level, the default value is
>>> > different
>>> > according to the
>>> > job type. In other words, by default, for Batch jobs, the config
>>> > value
>>> > is
>>> > enabled, 1000.
>>> > And for Streaming jobs, the config value is not enabled by default.
>>> >
>>> > 2. I think this is a good point. The total floating buffers will not
>>> > change
>>> > with
>>> >
>>> >
>>> >
>>> >
>>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
>>> > because this is the maximum memory threshold. But if the user
>>> > explicitly
>>> > specified
>>> > the ExclusiveBuffersPerChannel, the calculated result of
>>> > ExclusiveBuffersPerChannel * numChannels will change with it.
>>> >
>>> >
>>> > Thanks again for all feedback!
>>> >
>>> >
>>> > Best,
>>> > Yuxin
>>> >
>>> >
>>> > Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
>>> >
>>> > Hi Yuxin,
>>> >
>>> > Thanks for creating this FLIP.
>>> >
>>> > It's good if Flink does not require users to set a very large
>>> > network
>>> > memory, or tune the advanced(hard-to-understand)
>>> > per-channel/per-gate
>>> > buffer configs, to avoid "Insufficient number of network buffers"
>>> > exceptions
>>> > which can easily happen for large scale jobs.
>>> >
>>> > Regarding the new config
>>> > "taskmanager.memory.network.read-required-buffer.max",
>>> > I think it's still an advanced config which users may feel hard to
>>> > tune.
>>> > However, given that in most cases users will not need to set it, I
>>> > think it's acceptable.
>>> >
>>> > So +1 for this FLIP.
>>> >
>>> > In the future, I think Flink should adaptively select to use
>>> > exclusive
>>> > buffers
>>> > or not according to whether there are sufficient network buffers at
>>> > runtime.
>>> > Users then no longer need to understand the above configuration.
>>> > This
>>> > may
>>> > require supporting transitions between exclusive buffers and
>>> > floating
>>> > buffers.
>>> > A problem of all buffer floating is that too few network buffers
>>> > can
>>> > result
>>> > in task slowness which is hard to identify by users. So it's also
>>> > needed
>>> > to
>>> > do improvements on metrics and web UI to expose such issues.
>>> >
>>> > Thanks,
>>> > Zhu
>>> >
>>> > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
>>> >
>>> > Hi Yuxin,
>>> >
>>> > Thanks for the proposal!
>>> >
>>> > After reading the FLIP, I have some questions about the default
>>> > value.
>>> > This FLIP seems to introduce a *new* config
>>> > option(taskmanager.memory.network.required-buffer-per-gate.max)
>>> > to
>>> > control
>>> > the network memory usage.
>>> > 1. Is this configuration at the job level or cluster level? As
>>> > the
>>> > FLIP
>>> > described, the default values of the Batch job and Stream job are
>>> > different, If an explicit value is set for cluster level, will it
>>> > affect
>>> > all Batch jobs and Stream jobs on the cluster?
>>> >
>>> > 2. The default value of Batch Job depends on the value of
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
>>> > if the value of ExclusiveBuffersPerChannel changed, does
>>> > "taskmanager.memory.network.required-buffer-per-gate.max" need to
>>> > change
>>> > with it?
>>> >
>>> >
>>> > Best,
>>> > Yanfei
>>> >
>>> > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
>>> >
>>> > Hi Yuxin,
>>> >
>>> > Thanks for proposing the FLIP!
>>> >
>>> > The motivation section makes sense. But it seems that the
>>> > proposed
>>> > change
>>> > section mixes the proposed config with the evaluation results.
>>> > It
>>> > is
>>> > a
>>> > bit
>>> > hard to understand what configs are proposed and how to
>>> > describe
>>> > these
>>> > configs to users. Given that the configuration setting is part
>>> > of
>>> > public
>>> > interfaces, it might be helpful to add a dedicated public
>>> > interface
>>> > section
>>> > to describe the config key and config semantics, as suggested
>>> > in
>>> > the
>>> > FLIP
>>> > template here
>>> > <
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>> >
>>> > .
>>> >
>>> > This FLIP seems to add more configs without removing any config
>>> > from
>>> > Flink.
>>> > Intuitively this can make the Flink configuration harder rather
>>> > than
>>> > simpler. Maybe we can get a better idea after we add a public
>>> > interface
>>> > section to clarify those configs.
>>> >
>>> > Thanks,
>>> > Dong
>>> >
>>> >
>>> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
>>> > tanyuxinwork@gmail.com>
>>> > wrote:
>>> >
>>> > Hi, devs,
>>> >
>>> > I'd like to start a discussion about FLIP-266: Simplify
>>> > network
>>> > memory
>>> > configurations for TaskManager[1].
>>> >
>>> > When using Flink, users may encounter the following issues
>>> > that
>>> > affect
>>> > usability.
>>> > 1. The job may fail with an "Insufficient number of network
>>> > buffers"
>>> > exception.
>>> > 2. Flink network memory size adjustment is complex.
>>> > When encountering these issues, users can solve some problems
>>> > by
>>> > adding
>>> > or
>>> > adjusting parameters. However, multiple memory config options
>>> > should
>>> > be
>>> > changed. The config option adjustment requires understanding
>>> > the
>>> > detailed
>>> > internal implementation, which is impractical for most users.
>>> >
>>> > To simplify network memory configurations for TaskManager and
>>> > improve
>>> > Flink
>>> > usability, this FLIP proposed some optimization solutions for
>>> > the
>>> > issues.
>>> >
>>> > Looking forward to your feedback.
>>> >
>>> > [1]
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>>> >
>>> > Best regards,
>>> > Yuxin
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Hi, everyone

Thanks for the reply and the discussion.

We discussed this with @Guowei Ma, @Dong Lin, and @Yanfei Lei
offline, and reached a consensus on this FLIP. Based on the offline
discussions and suggestions from @Weihua Hu, the following changes
have been updated in the FLIP.

1. Changes in public interfaces.
- Updated the descriptions of the newly added config to describe the
option more clearly.
- The new config will be marked as experimental in the first release,
and we will revisit this in the next release based on the user feedback.
- In the long run, with the new config, we think the original two configs
can be deprecated. At this stage, since the new config is still
experimental,
we will not immediately deprecate them.
- Modify the config key name as
taskmanager.memory.network.read-buffer.required-per-gate.max for
more clarity.
2. Modify the floating buffer calculation method.
- When the memory used reaches the threshold, the number of exclusive
buffers is gradually reduced in a fine-grained manner, rather than directly
reducing the number of exclusive buffers to 0.

Best,
Yuxin


Yuxin Tan <ta...@gmail.com> 于2022年12月29日周四 14:48写道:

> Hi, Roman
>
> Sorry about that I missed one question just now.
>
> >  if the two configuration options are still in use, why does the FLIP
> propose to deprecate them?
> These two configs are usually used to avoid the memory issue, but
> after introducing the improvement, generally, I think it is no longer
> necessary to adjust these two configurations to avoid the issue. So
> I propose to deprecate them in the future when the @Experimental
> annotation of the newly added config is removed.
>
> Best,
> Yuxin
>
>
> Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 20:10写道:
>
>> Thanks for your reply Yuxin,
>>
>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>> > configurations, which are not calculated. I have described them in the
>> FLIP
>> > motivation section.
>>
>> The motivation section says about floating buffers:
>> > FloatingBuffersPerGate is within the range of
>> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
>> DefaultFloatingBuffersPerGate] ...
>> So my question is what value exactly in this range will it have and how
>> and
>> where will it be computed?
>>
>> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
>> to calculate it dynamically (by linear search
>> from taskmanager.network.memory.buffers-per-channel down to 0).
>>
>> Also, if the two configuration options are still in use, why does the FLIP
>> propose to deprecate them?
>>
>> Besides that, wouldn't it be more clear to separate motivation from the
>> proposed changes?
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17...@163.com> wrote:
>>
>> > Hi Yuxin
>> >
>> >
>> > Thanks for the proposal, big + 1 for this FLIP.
>> >
>> >
>> >
>> > It is difficult for users to calculate the size of network memory. If
>> the
>> > setting is too small, the task cannot be started. If the setting is too
>> > large, there may be a waste of resources. As far as possible, Flink
>> > framework can automatically set a reasonable value, but I have a small
>> > problem. network memory is not only related to the parallelism of the
>> task,
>> > but also to the complexity of the task DAG. The more complex a DAG is,
>> > shuffle write and shuffle read require larger buffers. How can we
>> determine
>> > how many RS and IG a DAG has?
>> >
>> >
>> >
>> > Best
>> > JasonLee
>> >
>> >
>> > ---- Replied Message ----
>> > | From | Yuxin Tan<ta...@gmail.com> |
>> > | Date | 12/28/2022 18:29 |
>> > | To | <de...@flink.apache.org> |
>> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
>> configurations
>> > for TaskManager |
>> > Hi, Roman
>> >
>> > Thanks for the replay.
>> >
>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>> > configurations, which are not calculated. I have described them in the
>> FLIP
>> > motivation section.
>> >
>> > 3. Each gate requires at least one buffer...
>> > The timeout exception occurs when the ExclusiveBuffersPerChannel
>> > can not be requested from NetworkBufferPool, which is not caused by the
>> > change of this Flip. In addition, if  we have set the
>> > ExclusiveBuffersPerChannel
>> > to 0 when using floating buffers, which can also decrease the
>> probability
>> > of
>> > this exception.
>> >
>> > 4. It would be great to have experimental results for jobs with
>> different
>> > exchange types.
>> > Thanks for the suggestion. I have a test about different exchange types,
>> > forward
>> > and rescale, and the results show no differences from the all-to-all
>> type,
>> > which
>> > is also understandable, because the network memory usage is calculated
>> > with numChannels, independent of the edge type.
>> >
>> > Best,
>> > Yuxin
>> >
>> >
>> > Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
>> >
>> > Hi everyone,
>> >
>> > Thanks for the proposal and the discussion.
>> >
>> > I couldn't find much details on how exactly the values of
>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
>> > I guess that
>> > - the threshold evaluation is done on JM
>> > - floating buffers calculation is done on TM based on the current memory
>> > available; so it is not taking into account any future tasks submitted
>> for
>> > that (or other) job
>> > Is that correct?
>> >
>> > If so, I see the following potential issues:
>> >
>> > 1. Each (sub)task might have different values because the actual
>> > available memory might be different. E.g. some tasks might use exclusive
>> > buffers and others only floating. That could lead to significant skew
>> > in processing speed, and in turn to issues with checkpoints and
>> watermarks.
>> >
>> > 2. Re-deployment of a task (e.g. on job failure) might lead to a
>> completely
>> > different memory configuration. That, coupled with different values per
>> > subtask and operator, makes the performance analysis more difficult.
>> >
>> > (Regardless of whether it's done on TM or JM):
>> > 3. Each gate requires at least one buffer [1]. So, in case when no
>> memory
>> > is available, TM will throw an Allocation timeout exception instead of
>> > Insufficient buffers exception immediately. A delay here (allocation
>> > timeout) seems like a regression.
>> > Besides that, the regression depends on how much memory is actually
>> > available and how much it is contended, doesn't it?
>> > Should there still be a lower threshold of available memory, below which
>> > the job (task) isn't accepted?
>> > 4. The same threshold for all types of shuffles will likely result in
>> using
>> > exclusive buffers
>> > for point-wise connections and floating buffers for all-to-all ones. I'm
>> > not sure if that's always optimal. It would be great to have
>> experimental
>> > results for jobs with different exchange types, WDYT?
>> >
>> > [1]
>> > https://issues.apache.org/jira/browse/FLINK-24035
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com>
>> wrote:
>> >
>> > Hi, Weihua
>> >
>> > Thanks for your suggestions.
>> >
>> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
>> > total buffer is not enough?
>> >
>> > I think it's a good idea. Will try and check the results in PoC. Before
>> > all
>> > read buffers use floating buffers, I will try to use
>> > (ExclusiveBuffersPerChannel - i)
>> > buffers per channel first. For example, if the user has configured
>> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
>> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
>> > all channels is 1 and all read buffers are insufficient, all read
>> buffers
>> > will use floating buffers.
>> > If the test results prove better, the FLIP will use this method.
>> >
>> > 2. Do we really need to change the default value of
>> > 'taskmanager.memory.network.max'?
>> >
>> > Changing taskmanager.memory.network.max will indeed affect some
>> > users, but the user only is affected when the 3 conditions are
>> fulfilled.
>> > 1) Flink total TM memory is larger than 10g (because the network memory
>> > ratio is 0.1).
>> > 2) taskmanager.memory.network.max was not initially configured.
>> > 3) Other memory, such as managed memory or heap memory, is insufficient.
>> > I think the number of jobs fulfilling the conditions is small because
>> > when
>> > TM
>> > uses such a large amount of memory, the network memory requirement may
>> > also be large. And when encountering the issue, the rollback method is
>> > very
>> > simple,
>> > configuring taskmanager.memory.network.max as 1g or other values.
>> > In addition, the reason for modifying the default value is to simplify
>> > the
>> > network
>> > configurations in most scenarios. This change does affect a few usage
>> > scenarios,
>> > but we should admit that setting the default to any value may not meet
>> > the requirements of all scenarios.
>> >
>> > Best,
>> > Yuxin
>> >
>> >
>> > Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
>> >
>> > Hi Yuxin,
>> > Thanks for the proposal.
>> >
>> > "Insufficient number of network buffers" exceptions also bother us.
>> > It's
>> > too hard for users to figure out
>> > how much network buffer they really need. It relates to partitioner
>> > type,
>> > parallelism, slots per taskmanager.
>> >
>> > Since streaming jobs are our primary scenario, I have some questions
>> > about
>> > streaming jobs.
>> >
>> > 1. In this FLIP, all read buffers will use floating buffers when the
>> > total
>> > buffer is more than
>> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
>> > buffer allocation led to preference regression.
>> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
>> > buffer is not enough?
>> > Will this reduce performance regression in streaming?
>> >
>> > 2. Changing taskmanager.memory.network.max will affect user migration
>> > from
>> > the lower version.
>> > IMO, network buffer size should not increase with total memory,
>> > especially
>> > for streaming jobs with application mode.
>> > For example, some ETL jobs with rescale partitioner only require a few
>> > network buffers.
>> > And we already have
>> > 'taskmanager.memory.network.read-required-buffer.max'
>> > to control maximum read network buffer usage.
>> > Do we really need to change the default value of
>> > 'taskmanager.memory.network.max'?
>> >
>> > Best,
>> > Weihua
>> >
>> >
>> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
>> > wrote:
>> >
>> > Hi, all
>> > Thanks for the reply and feedback for everyone!
>> >
>> >
>> > After combining everyone's comments, the main concerns, and
>> > corresponding
>> > adjustments are as follows.
>> >
>> >
>> > @Guowei Ma, Thanks for your feedback.
>> > should we introduce a _new_ non-orthogonal
>> > option(`taskmanager.memory.network.required-buffer-per-gate.max`).
>> > That
>> > is
>> > to say, the option will affect both streaming and batch shuffle
>> > behavior
>> > at
>> > the
>> > same time.
>> >
>> > 1. Because the default option can meet most requirements no matter in
>> > Streaming
>> > or Batch scenarios. We do not want users to adjust this default
>> > config
>> > option by
>> > design. This configuration option is added only to preserve the
>> > possibility
>> > of
>> > modification options for users.
>> > 2. In a few cases, if you really want to adjust this option, users
>> > may
>> > not
>> > expect to
>> > adjust the option according to Streaming or Batch, for example,
>> > according
>> > to the
>> > parallelism of the job.
>> > 3. Regarding the performance of streaming shuffle, the same problem
>> > of
>> > insufficient memory also exists for Streaming jobs. We introduced
>> > this
>> > configuration
>> > to enable users to decouple memory and parallelism, but it will
>> > affect
>> > some
>> > performance. By default, the feature is disabled and does not affect
>> > performance.
>> > However, the added configuration enables users to choose to decouple
>> > memory
>> > usage and parallelism for Streaming jobs.
>> >
>> > It's better not to expose more implementation-related concepts to
>> > users.
>> >
>> > Thanks for you suggestion. I will modify the option name to avoid
>> > exposing
>> > implementation-related concepts. I have changed it to
>> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
>> >
>> >
>> >
>> > @Dong Lin, Thanks for your reply.
>> > it might be helpful to add a dedicated public interface section to
>> > describe
>> > the config key and config semantics.
>> >
>> > Thanks for your suggestion. I have added public interface section to
>> > describe
>> > the config key and config semantics clearly.
>> >
>> > This FLIP seems to add more configs without removing any config
>> > from
>> > Flink.
>> >
>> > This Flip is to reduce the number of options to be adjusted when
>> > using
>> > Flink.
>> > After the Flip, the default option can meet the requirements in most
>> > sceneries
>> > rather than modifying any config
>> > options(`taskmanager.network.memory.buffers-per-channel`
>> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
>> > helpful
>> > to improve the out-of-box usability. In the long run, these two
>> > parameters
>> > `taskmanager.network.memory.buffers-per-channel` and
>> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
>> > deprecated
>> > to reduce user parameters, but from the perspective of compatibility,
>> > we
>> > need to
>> > pay attention to users' feedback before deciding to deprecate the
>> > options.
>> >
>> >
>> >
>> > @Yanfei Lei,Thanks for your feedback.
>> > 1. Through the option is cluster level, the default value is
>> > different
>> > according to the
>> > job type. In other words, by default, for Batch jobs, the config
>> > value
>> > is
>> > enabled, 1000.
>> > And for Streaming jobs, the config value is not enabled by default.
>> >
>> > 2. I think this is a good point. The total floating buffers will not
>> > change
>> > with
>> >
>> >
>> >
>> >
>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
>> > because this is the maximum memory threshold. But if the user
>> > explicitly
>> > specified
>> > the ExclusiveBuffersPerChannel, the calculated result of
>> > ExclusiveBuffersPerChannel * numChannels will change with it.
>> >
>> >
>> > Thanks again for all feedback!
>> >
>> >
>> > Best,
>> > Yuxin
>> >
>> >
>> > Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
>> >
>> > Hi Yuxin,
>> >
>> > Thanks for creating this FLIP.
>> >
>> > It's good if Flink does not require users to set a very large
>> > network
>> > memory, or tune the advanced(hard-to-understand)
>> > per-channel/per-gate
>> > buffer configs, to avoid "Insufficient number of network buffers"
>> > exceptions
>> > which can easily happen for large scale jobs.
>> >
>> > Regarding the new config
>> > "taskmanager.memory.network.read-required-buffer.max",
>> > I think it's still an advanced config which users may feel hard to
>> > tune.
>> > However, given that in most cases users will not need to set it, I
>> > think it's acceptable.
>> >
>> > So +1 for this FLIP.
>> >
>> > In the future, I think Flink should adaptively select to use
>> > exclusive
>> > buffers
>> > or not according to whether there are sufficient network buffers at
>> > runtime.
>> > Users then no longer need to understand the above configuration.
>> > This
>> > may
>> > require supporting transitions between exclusive buffers and
>> > floating
>> > buffers.
>> > A problem of all buffer floating is that too few network buffers
>> > can
>> > result
>> > in task slowness which is hard to identify by users. So it's also
>> > needed
>> > to
>> > do improvements on metrics and web UI to expose such issues.
>> >
>> > Thanks,
>> > Zhu
>> >
>> > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
>> >
>> > Hi Yuxin,
>> >
>> > Thanks for the proposal!
>> >
>> > After reading the FLIP, I have some questions about the default
>> > value.
>> > This FLIP seems to introduce a *new* config
>> > option(taskmanager.memory.network.required-buffer-per-gate.max)
>> > to
>> > control
>> > the network memory usage.
>> > 1. Is this configuration at the job level or cluster level? As
>> > the
>> > FLIP
>> > described, the default values of the Batch job and Stream job are
>> > different, If an explicit value is set for cluster level, will it
>> > affect
>> > all Batch jobs and Stream jobs on the cluster?
>> >
>> > 2. The default value of Batch Job depends on the value of
>> >
>> >
>> >
>> >
>> >
>> >
>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
>> > if the value of ExclusiveBuffersPerChannel changed, does
>> > "taskmanager.memory.network.required-buffer-per-gate.max" need to
>> > change
>> > with it?
>> >
>> >
>> > Best,
>> > Yanfei
>> >
>> > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
>> >
>> > Hi Yuxin,
>> >
>> > Thanks for proposing the FLIP!
>> >
>> > The motivation section makes sense. But it seems that the
>> > proposed
>> > change
>> > section mixes the proposed config with the evaluation results.
>> > It
>> > is
>> > a
>> > bit
>> > hard to understand what configs are proposed and how to
>> > describe
>> > these
>> > configs to users. Given that the configuration setting is part
>> > of
>> > public
>> > interfaces, it might be helpful to add a dedicated public
>> > interface
>> > section
>> > to describe the config key and config semantics, as suggested
>> > in
>> > the
>> > FLIP
>> > template here
>> > <
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> >
>> > .
>> >
>> > This FLIP seems to add more configs without removing any config
>> > from
>> > Flink.
>> > Intuitively this can make the Flink configuration harder rather
>> > than
>> > simpler. Maybe we can get a better idea after we add a public
>> > interface
>> > section to clarify those configs.
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
>> > tanyuxinwork@gmail.com>
>> > wrote:
>> >
>> > Hi, devs,
>> >
>> > I'd like to start a discussion about FLIP-266: Simplify
>> > network
>> > memory
>> > configurations for TaskManager[1].
>> >
>> > When using Flink, users may encounter the following issues
>> > that
>> > affect
>> > usability.
>> > 1. The job may fail with an "Insufficient number of network
>> > buffers"
>> > exception.
>> > 2. Flink network memory size adjustment is complex.
>> > When encountering these issues, users can solve some problems
>> > by
>> > adding
>> > or
>> > adjusting parameters. However, multiple memory config options
>> > should
>> > be
>> > changed. The config option adjustment requires understanding
>> > the
>> > detailed
>> > internal implementation, which is impractical for most users.
>> >
>> > To simplify network memory configurations for TaskManager and
>> > improve
>> > Flink
>> > usability, this FLIP proposed some optimization solutions for
>> > the
>> > issues.
>> >
>> > Looking forward to your feedback.
>> >
>> > [1]
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>> >
>> > Best regards,
>> > Yuxin
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Hi, Roman

Sorry about that I missed one question just now.

>  if the two configuration options are still in use, why does the FLIP
propose to deprecate them?
These two configs are usually used to avoid the memory issue, but
after introducing the improvement, generally, I think it is no longer
necessary to adjust these two configurations to avoid the issue. So
I propose to deprecate them in the future when the @Experimental
annotation of the newly added config is removed.

Best,
Yuxin


Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 20:10写道:

> Thanks for your reply Yuxin,
>
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
>
> The motivation section says about floating buffers:
> > FloatingBuffersPerGate is within the range of
> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
> DefaultFloatingBuffersPerGate] ...
> So my question is what value exactly in this range will it have and how and
> where will it be computed?
>
> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
> to calculate it dynamically (by linear search
> from taskmanager.network.memory.buffers-per-channel down to 0).
>
> Also, if the two configuration options are still in use, why does the FLIP
> propose to deprecate them?
>
> Besides that, wouldn't it be more clear to separate motivation from the
> proposed changes?
>
> Regards,
> Roman
>
>
> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17...@163.com> wrote:
>
> > Hi Yuxin
> >
> >
> > Thanks for the proposal, big + 1 for this FLIP.
> >
> >
> >
> > It is difficult for users to calculate the size of network memory. If the
> > setting is too small, the task cannot be started. If the setting is too
> > large, there may be a waste of resources. As far as possible, Flink
> > framework can automatically set a reasonable value, but I have a small
> > problem. network memory is not only related to the parallelism of the
> task,
> > but also to the complexity of the task DAG. The more complex a DAG is,
> > shuffle write and shuffle read require larger buffers. How can we
> determine
> > how many RS and IG a DAG has?
> >
> >
> >
> > Best
> > JasonLee
> >
> >
> > ---- Replied Message ----
> > | From | Yuxin Tan<ta...@gmail.com> |
> > | Date | 12/28/2022 18:29 |
> > | To | <de...@flink.apache.org> |
> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
> configurations
> > for TaskManager |
> > Hi, Roman
> >
> > Thanks for the replay.
> >
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
> >
> > 3. Each gate requires at least one buffer...
> > The timeout exception occurs when the ExclusiveBuffersPerChannel
> > can not be requested from NetworkBufferPool, which is not caused by the
> > change of this Flip. In addition, if  we have set the
> > ExclusiveBuffersPerChannel
> > to 0 when using floating buffers, which can also decrease the probability
> > of
> > this exception.
> >
> > 4. It would be great to have experimental results for jobs with different
> > exchange types.
> > Thanks for the suggestion. I have a test about different exchange types,
> > forward
> > and rescale, and the results show no differences from the all-to-all
> type,
> > which
> > is also understandable, because the network memory usage is calculated
> > with numChannels, independent of the edge type.
> >
> > Best,
> > Yuxin
> >
> >
> > Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
> >
> > Hi everyone,
> >
> > Thanks for the proposal and the discussion.
> >
> > I couldn't find much details on how exactly the values of
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> > I guess that
> > - the threshold evaluation is done on JM
> > - floating buffers calculation is done on TM based on the current memory
> > available; so it is not taking into account any future tasks submitted
> for
> > that (or other) job
> > Is that correct?
> >
> > If so, I see the following potential issues:
> >
> > 1. Each (sub)task might have different values because the actual
> > available memory might be different. E.g. some tasks might use exclusive
> > buffers and others only floating. That could lead to significant skew
> > in processing speed, and in turn to issues with checkpoints and
> watermarks.
> >
> > 2. Re-deployment of a task (e.g. on job failure) might lead to a
> completely
> > different memory configuration. That, coupled with different values per
> > subtask and operator, makes the performance analysis more difficult.
> >
> > (Regardless of whether it's done on TM or JM):
> > 3. Each gate requires at least one buffer [1]. So, in case when no memory
> > is available, TM will throw an Allocation timeout exception instead of
> > Insufficient buffers exception immediately. A delay here (allocation
> > timeout) seems like a regression.
> > Besides that, the regression depends on how much memory is actually
> > available and how much it is contended, doesn't it?
> > Should there still be a lower threshold of available memory, below which
> > the job (task) isn't accepted?
> > 4. The same threshold for all types of shuffles will likely result in
> using
> > exclusive buffers
> > for point-wise connections and floating buffers for all-to-all ones. I'm
> > not sure if that's always optimal. It would be great to have experimental
> > results for jobs with different exchange types, WDYT?
> >
> > [1]
> > https://issues.apache.org/jira/browse/FLINK-24035
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com>
> wrote:
> >
> > Hi, Weihua
> >
> > Thanks for your suggestions.
> >
> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> > total buffer is not enough?
> >
> > I think it's a good idea. Will try and check the results in PoC. Before
> > all
> > read buffers use floating buffers, I will try to use
> > (ExclusiveBuffersPerChannel - i)
> > buffers per channel first. For example, if the user has configured
> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> > all channels is 1 and all read buffers are insufficient, all read buffers
> > will use floating buffers.
> > If the test results prove better, the FLIP will use this method.
> >
> > 2. Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Changing taskmanager.memory.network.max will indeed affect some
> > users, but the user only is affected when the 3 conditions are fulfilled.
> > 1) Flink total TM memory is larger than 10g (because the network memory
> > ratio is 0.1).
> > 2) taskmanager.memory.network.max was not initially configured.
> > 3) Other memory, such as managed memory or heap memory, is insufficient.
> > I think the number of jobs fulfilling the conditions is small because
> > when
> > TM
> > uses such a large amount of memory, the network memory requirement may
> > also be large. And when encountering the issue, the rollback method is
> > very
> > simple,
> > configuring taskmanager.memory.network.max as 1g or other values.
> > In addition, the reason for modifying the default value is to simplify
> > the
> > network
> > configurations in most scenarios. This change does affect a few usage
> > scenarios,
> > but we should admit that setting the default to any value may not meet
> > the requirements of all scenarios.
> >
> > Best,
> > Yuxin
> >
> >
> > Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
> >
> > Hi Yuxin,
> > Thanks for the proposal.
> >
> > "Insufficient number of network buffers" exceptions also bother us.
> > It's
> > too hard for users to figure out
> > how much network buffer they really need. It relates to partitioner
> > type,
> > parallelism, slots per taskmanager.
> >
> > Since streaming jobs are our primary scenario, I have some questions
> > about
> > streaming jobs.
> >
> > 1. In this FLIP, all read buffers will use floating buffers when the
> > total
> > buffer is more than
> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> > buffer allocation led to preference regression.
> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> > buffer is not enough?
> > Will this reduce performance regression in streaming?
> >
> > 2. Changing taskmanager.memory.network.max will affect user migration
> > from
> > the lower version.
> > IMO, network buffer size should not increase with total memory,
> > especially
> > for streaming jobs with application mode.
> > For example, some ETL jobs with rescale partitioner only require a few
> > network buffers.
> > And we already have
> > 'taskmanager.memory.network.read-required-buffer.max'
> > to control maximum read network buffer usage.
> > Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
> > wrote:
> >
> > Hi, all
> > Thanks for the reply and feedback for everyone!
> >
> >
> > After combining everyone's comments, the main concerns, and
> > corresponding
> > adjustments are as follows.
> >
> >
> > @Guowei Ma, Thanks for your feedback.
> > should we introduce a _new_ non-orthogonal
> > option(`taskmanager.memory.network.required-buffer-per-gate.max`).
> > That
> > is
> > to say, the option will affect both streaming and batch shuffle
> > behavior
> > at
> > the
> > same time.
> >
> > 1. Because the default option can meet most requirements no matter in
> > Streaming
> > or Batch scenarios. We do not want users to adjust this default
> > config
> > option by
> > design. This configuration option is added only to preserve the
> > possibility
> > of
> > modification options for users.
> > 2. In a few cases, if you really want to adjust this option, users
> > may
> > not
> > expect to
> > adjust the option according to Streaming or Batch, for example,
> > according
> > to the
> > parallelism of the job.
> > 3. Regarding the performance of streaming shuffle, the same problem
> > of
> > insufficient memory also exists for Streaming jobs. We introduced
> > this
> > configuration
> > to enable users to decouple memory and parallelism, but it will
> > affect
> > some
> > performance. By default, the feature is disabled and does not affect
> > performance.
> > However, the added configuration enables users to choose to decouple
> > memory
> > usage and parallelism for Streaming jobs.
> >
> > It's better not to expose more implementation-related concepts to
> > users.
> >
> > Thanks for you suggestion. I will modify the option name to avoid
> > exposing
> > implementation-related concepts. I have changed it to
> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
> >
> >
> >
> > @Dong Lin, Thanks for your reply.
> > it might be helpful to add a dedicated public interface section to
> > describe
> > the config key and config semantics.
> >
> > Thanks for your suggestion. I have added public interface section to
> > describe
> > the config key and config semantics clearly.
> >
> > This FLIP seems to add more configs without removing any config
> > from
> > Flink.
> >
> > This Flip is to reduce the number of options to be adjusted when
> > using
> > Flink.
> > After the Flip, the default option can meet the requirements in most
> > sceneries
> > rather than modifying any config
> > options(`taskmanager.network.memory.buffers-per-channel`
> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> > helpful
> > to improve the out-of-box usability. In the long run, these two
> > parameters
> > `taskmanager.network.memory.buffers-per-channel` and
> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> > deprecated
> > to reduce user parameters, but from the perspective of compatibility,
> > we
> > need to
> > pay attention to users' feedback before deciding to deprecate the
> > options.
> >
> >
> >
> > @Yanfei Lei,Thanks for your feedback.
> > 1. Through the option is cluster level, the default value is
> > different
> > according to the
> > job type. In other words, by default, for Batch jobs, the config
> > value
> > is
> > enabled, 1000.
> > And for Streaming jobs, the config value is not enabled by default.
> >
> > 2. I think this is a good point. The total floating buffers will not
> > change
> > with
> >
> >
> >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> > because this is the maximum memory threshold. But if the user
> > explicitly
> > specified
> > the ExclusiveBuffersPerChannel, the calculated result of
> > ExclusiveBuffersPerChannel * numChannels will change with it.
> >
> >
> > Thanks again for all feedback!
> >
> >
> > Best,
> > Yuxin
> >
> >
> > Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
> >
> > Hi Yuxin,
> >
> > Thanks for creating this FLIP.
> >
> > It's good if Flink does not require users to set a very large
> > network
> > memory, or tune the advanced(hard-to-understand)
> > per-channel/per-gate
> > buffer configs, to avoid "Insufficient number of network buffers"
> > exceptions
> > which can easily happen for large scale jobs.
> >
> > Regarding the new config
> > "taskmanager.memory.network.read-required-buffer.max",
> > I think it's still an advanced config which users may feel hard to
> > tune.
> > However, given that in most cases users will not need to set it, I
> > think it's acceptable.
> >
> > So +1 for this FLIP.
> >
> > In the future, I think Flink should adaptively select to use
> > exclusive
> > buffers
> > or not according to whether there are sufficient network buffers at
> > runtime.
> > Users then no longer need to understand the above configuration.
> > This
> > may
> > require supporting transitions between exclusive buffers and
> > floating
> > buffers.
> > A problem of all buffer floating is that too few network buffers
> > can
> > result
> > in task slowness which is hard to identify by users. So it's also
> > needed
> > to
> > do improvements on metrics and web UI to expose such issues.
> >
> > Thanks,
> > Zhu
> >
> > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
> >
> > Hi Yuxin,
> >
> > Thanks for the proposal!
> >
> > After reading the FLIP, I have some questions about the default
> > value.
> > This FLIP seems to introduce a *new* config
> > option(taskmanager.memory.network.required-buffer-per-gate.max)
> > to
> > control
> > the network memory usage.
> > 1. Is this configuration at the job level or cluster level? As
> > the
> > FLIP
> > described, the default values of the Batch job and Stream job are
> > different, If an explicit value is set for cluster level, will it
> > affect
> > all Batch jobs and Stream jobs on the cluster?
> >
> > 2. The default value of Batch Job depends on the value of
> >
> >
> >
> >
> >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > if the value of ExclusiveBuffersPerChannel changed, does
> > "taskmanager.memory.network.required-buffer-per-gate.max" need to
> > change
> > with it?
> >
> >
> > Best,
> > Yanfei
> >
> > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
> >
> > Hi Yuxin,
> >
> > Thanks for proposing the FLIP!
> >
> > The motivation section makes sense. But it seems that the
> > proposed
> > change
> > section mixes the proposed config with the evaluation results.
> > It
> > is
> > a
> > bit
> > hard to understand what configs are proposed and how to
> > describe
> > these
> > configs to users. Given that the configuration setting is part
> > of
> > public
> > interfaces, it might be helpful to add a dedicated public
> > interface
> > section
> > to describe the config key and config semantics, as suggested
> > in
> > the
> > FLIP
> > template here
> > <
> >
> >
> >
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> > .
> >
> > This FLIP seems to add more configs without removing any config
> > from
> > Flink.
> > Intuitively this can make the Flink configuration harder rather
> > than
> > simpler. Maybe we can get a better idea after we add a public
> > interface
> > section to clarify those configs.
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> > tanyuxinwork@gmail.com>
> > wrote:
> >
> > Hi, devs,
> >
> > I'd like to start a discussion about FLIP-266: Simplify
> > network
> > memory
> > configurations for TaskManager[1].
> >
> > When using Flink, users may encounter the following issues
> > that
> > affect
> > usability.
> > 1. The job may fail with an "Insufficient number of network
> > buffers"
> > exception.
> > 2. Flink network memory size adjustment is complex.
> > When encountering these issues, users can solve some problems
> > by
> > adding
> > or
> > adjusting parameters. However, multiple memory config options
> > should
> > be
> > changed. The config option adjustment requires understanding
> > the
> > detailed
> > internal implementation, which is impractical for most users.
> >
> > To simplify network memory configurations for TaskManager and
> > improve
> > Flink
> > usability, this FLIP proposed some optimization solutions for
> > the
> > issues.
> >
> > Looking forward to your feedback.
> >
> > [1]
> >
> >
> >
> >
> >
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> >
> > Best regards,
> > Yuxin
> >
> >
> >
> >
> >
> >
> >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Roman Khachatryan <ro...@apache.org>.
Thanks for your reply Yuxin,

> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> configurations, which are not calculated. I have described them in the
FLIP
> motivation section.

The motivation section says about floating buffers:
> FloatingBuffersPerGate is within the range of
[numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
DefaultFloatingBuffersPerGate] ...
So my question is what value exactly in this range will it have and how and
where will it be computed?

As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
to calculate it dynamically (by linear search
from taskmanager.network.memory.buffers-per-channel down to 0).

Also, if the two configuration options are still in use, why does the FLIP
propose to deprecate them?

Besides that, wouldn't it be more clear to separate motivation from the
proposed changes?

Regards,
Roman


On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17...@163.com> wrote:

> Hi Yuxin
>
>
> Thanks for the proposal, big + 1 for this FLIP.
>
>
>
> It is difficult for users to calculate the size of network memory. If the
> setting is too small, the task cannot be started. If the setting is too
> large, there may be a waste of resources. As far as possible, Flink
> framework can automatically set a reasonable value, but I have a small
> problem. network memory is not only related to the parallelism of the task,
> but also to the complexity of the task DAG. The more complex a DAG is,
> shuffle write and shuffle read require larger buffers. How can we determine
> how many RS and IG a DAG has?
>
>
>
> Best
> JasonLee
>
>
> ---- Replied Message ----
> | From | Yuxin Tan<ta...@gmail.com> |
> | Date | 12/28/2022 18:29 |
> | To | <de...@flink.apache.org> |
> | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations
> for TaskManager |
> Hi, Roman
>
> Thanks for the replay.
>
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> configurations, which are not calculated. I have described them in the FLIP
> motivation section.
>
> 3. Each gate requires at least one buffer...
> The timeout exception occurs when the ExclusiveBuffersPerChannel
> can not be requested from NetworkBufferPool, which is not caused by the
> change of this Flip. In addition, if  we have set the
> ExclusiveBuffersPerChannel
> to 0 when using floating buffers, which can also decrease the probability
> of
> this exception.
>
> 4. It would be great to have experimental results for jobs with different
> exchange types.
> Thanks for the suggestion. I have a test about different exchange types,
> forward
> and rescale, and the results show no differences from the all-to-all type,
> which
> is also understandable, because the network memory usage is calculated
> with numChannels, independent of the edge type.
>
> Best,
> Yuxin
>
>
> Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
>
> Hi everyone,
>
> Thanks for the proposal and the discussion.
>
> I couldn't find much details on how exactly the values of
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> I guess that
> - the threshold evaluation is done on JM
> - floating buffers calculation is done on TM based on the current memory
> available; so it is not taking into account any future tasks submitted for
> that (or other) job
> Is that correct?
>
> If so, I see the following potential issues:
>
> 1. Each (sub)task might have different values because the actual
> available memory might be different. E.g. some tasks might use exclusive
> buffers and others only floating. That could lead to significant skew
> in processing speed, and in turn to issues with checkpoints and watermarks.
>
> 2. Re-deployment of a task (e.g. on job failure) might lead to a completely
> different memory configuration. That, coupled with different values per
> subtask and operator, makes the performance analysis more difficult.
>
> (Regardless of whether it's done on TM or JM):
> 3. Each gate requires at least one buffer [1]. So, in case when no memory
> is available, TM will throw an Allocation timeout exception instead of
> Insufficient buffers exception immediately. A delay here (allocation
> timeout) seems like a regression.
> Besides that, the regression depends on how much memory is actually
> available and how much it is contended, doesn't it?
> Should there still be a lower threshold of available memory, below which
> the job (task) isn't accepted?
> 4. The same threshold for all types of shuffles will likely result in using
> exclusive buffers
> for point-wise connections and floating buffers for all-to-all ones. I'm
> not sure if that's always optimal. It would be great to have experimental
> results for jobs with different exchange types, WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-24035
>
> Regards,
> Roman
>
>
> On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com> wrote:
>
> Hi, Weihua
>
> Thanks for your suggestions.
>
> 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> total buffer is not enough?
>
> I think it's a good idea. Will try and check the results in PoC. Before
> all
> read buffers use floating buffers, I will try to use
> (ExclusiveBuffersPerChannel - i)
> buffers per channel first. For example, if the user has configured
> ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> all channels is 1 and all read buffers are insufficient, all read buffers
> will use floating buffers.
> If the test results prove better, the FLIP will use this method.
>
> 2. Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Changing taskmanager.memory.network.max will indeed affect some
> users, but the user only is affected when the 3 conditions are fulfilled.
> 1) Flink total TM memory is larger than 10g (because the network memory
> ratio is 0.1).
> 2) taskmanager.memory.network.max was not initially configured.
> 3) Other memory, such as managed memory or heap memory, is insufficient.
> I think the number of jobs fulfilling the conditions is small because
> when
> TM
> uses such a large amount of memory, the network memory requirement may
> also be large. And when encountering the issue, the rollback method is
> very
> simple,
> configuring taskmanager.memory.network.max as 1g or other values.
> In addition, the reason for modifying the default value is to simplify
> the
> network
> configurations in most scenarios. This change does affect a few usage
> scenarios,
> but we should admit that setting the default to any value may not meet
> the requirements of all scenarios.
>
> Best,
> Yuxin
>
>
> Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
>
> Hi Yuxin,
> Thanks for the proposal.
>
> "Insufficient number of network buffers" exceptions also bother us.
> It's
> too hard for users to figure out
> how much network buffer they really need. It relates to partitioner
> type,
> parallelism, slots per taskmanager.
>
> Since streaming jobs are our primary scenario, I have some questions
> about
> streaming jobs.
>
> 1. In this FLIP, all read buffers will use floating buffers when the
> total
> buffer is more than
> 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> buffer allocation led to preference regression.
> How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> buffer is not enough?
> Will this reduce performance regression in streaming?
>
> 2. Changing taskmanager.memory.network.max will affect user migration
> from
> the lower version.
> IMO, network buffer size should not increase with total memory,
> especially
> for streaming jobs with application mode.
> For example, some ETL jobs with rescale partitioner only require a few
> network buffers.
> And we already have
> 'taskmanager.memory.network.read-required-buffer.max'
> to control maximum read network buffer usage.
> Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Best,
> Weihua
>
>
> On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
> wrote:
>
> Hi, all
> Thanks for the reply and feedback for everyone!
>
>
> After combining everyone's comments, the main concerns, and
> corresponding
> adjustments are as follows.
>
>
> @Guowei Ma, Thanks for your feedback.
> should we introduce a _new_ non-orthogonal
> option(`taskmanager.memory.network.required-buffer-per-gate.max`).
> That
> is
> to say, the option will affect both streaming and batch shuffle
> behavior
> at
> the
> same time.
>
> 1. Because the default option can meet most requirements no matter in
> Streaming
> or Batch scenarios. We do not want users to adjust this default
> config
> option by
> design. This configuration option is added only to preserve the
> possibility
> of
> modification options for users.
> 2. In a few cases, if you really want to adjust this option, users
> may
> not
> expect to
> adjust the option according to Streaming or Batch, for example,
> according
> to the
> parallelism of the job.
> 3. Regarding the performance of streaming shuffle, the same problem
> of
> insufficient memory also exists for Streaming jobs. We introduced
> this
> configuration
> to enable users to decouple memory and parallelism, but it will
> affect
> some
> performance. By default, the feature is disabled and does not affect
> performance.
> However, the added configuration enables users to choose to decouple
> memory
> usage and parallelism for Streaming jobs.
>
> It's better not to expose more implementation-related concepts to
> users.
>
> Thanks for you suggestion. I will modify the option name to avoid
> exposing
> implementation-related concepts. I have changed it to
> `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
>
>
>
> @Dong Lin, Thanks for your reply.
> it might be helpful to add a dedicated public interface section to
> describe
> the config key and config semantics.
>
> Thanks for your suggestion. I have added public interface section to
> describe
> the config key and config semantics clearly.
>
> This FLIP seems to add more configs without removing any config
> from
> Flink.
>
> This Flip is to reduce the number of options to be adjusted when
> using
> Flink.
> After the Flip, the default option can meet the requirements in most
> sceneries
> rather than modifying any config
> options(`taskmanager.network.memory.buffers-per-channel`
> and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> helpful
> to improve the out-of-box usability. In the long run, these two
> parameters
> `taskmanager.network.memory.buffers-per-channel` and
> `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> deprecated
> to reduce user parameters, but from the perspective of compatibility,
> we
> need to
> pay attention to users' feedback before deciding to deprecate the
> options.
>
>
>
> @Yanfei Lei,Thanks for your feedback.
> 1. Through the option is cluster level, the default value is
> different
> according to the
> job type. In other words, by default, for Batch jobs, the config
> value
> is
> enabled, 1000.
> And for Streaming jobs, the config value is not enabled by default.
>
> 2. I think this is a good point. The total floating buffers will not
> change
> with
>
>
>
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> because this is the maximum memory threshold. But if the user
> explicitly
> specified
> the ExclusiveBuffersPerChannel, the calculated result of
> ExclusiveBuffersPerChannel * numChannels will change with it.
>
>
> Thanks again for all feedback!
>
>
> Best,
> Yuxin
>
>
> Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
>
> Hi Yuxin,
>
> Thanks for creating this FLIP.
>
> It's good if Flink does not require users to set a very large
> network
> memory, or tune the advanced(hard-to-understand)
> per-channel/per-gate
> buffer configs, to avoid "Insufficient number of network buffers"
> exceptions
> which can easily happen for large scale jobs.
>
> Regarding the new config
> "taskmanager.memory.network.read-required-buffer.max",
> I think it's still an advanced config which users may feel hard to
> tune.
> However, given that in most cases users will not need to set it, I
> think it's acceptable.
>
> So +1 for this FLIP.
>
> In the future, I think Flink should adaptively select to use
> exclusive
> buffers
> or not according to whether there are sufficient network buffers at
> runtime.
> Users then no longer need to understand the above configuration.
> This
> may
> require supporting transitions between exclusive buffers and
> floating
> buffers.
> A problem of all buffer floating is that too few network buffers
> can
> result
> in task slowness which is hard to identify by users. So it's also
> needed
> to
> do improvements on metrics and web UI to expose such issues.
>
> Thanks,
> Zhu
>
> Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
>
> Hi Yuxin,
>
> Thanks for the proposal!
>
> After reading the FLIP, I have some questions about the default
> value.
> This FLIP seems to introduce a *new* config
> option(taskmanager.memory.network.required-buffer-per-gate.max)
> to
> control
> the network memory usage.
> 1. Is this configuration at the job level or cluster level? As
> the
> FLIP
> described, the default values of the Batch job and Stream job are
> different, If an explicit value is set for cluster level, will it
> affect
> all Batch jobs and Stream jobs on the cluster?
>
> 2. The default value of Batch Job depends on the value of
>
>
>
>
>
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> if the value of ExclusiveBuffersPerChannel changed, does
> "taskmanager.memory.network.required-buffer-per-gate.max" need to
> change
> with it?
>
>
> Best,
> Yanfei
>
> Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
>
> Hi Yuxin,
>
> Thanks for proposing the FLIP!
>
> The motivation section makes sense. But it seems that the
> proposed
> change
> section mixes the proposed config with the evaluation results.
> It
> is
> a
> bit
> hard to understand what configs are proposed and how to
> describe
> these
> configs to users. Given that the configuration setting is part
> of
> public
> interfaces, it might be helpful to add a dedicated public
> interface
> section
> to describe the config key and config semantics, as suggested
> in
> the
> FLIP
> template here
> <
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> .
>
> This FLIP seems to add more configs without removing any config
> from
> Flink.
> Intuitively this can make the Flink configuration harder rather
> than
> simpler. Maybe we can get a better idea after we add a public
> interface
> section to clarify those configs.
>
> Thanks,
> Dong
>
>
> On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> tanyuxinwork@gmail.com>
> wrote:
>
> Hi, devs,
>
> I'd like to start a discussion about FLIP-266: Simplify
> network
> memory
> configurations for TaskManager[1].
>
> When using Flink, users may encounter the following issues
> that
> affect
> usability.
> 1. The job may fail with an "Insufficient number of network
> buffers"
> exception.
> 2. Flink network memory size adjustment is complex.
> When encountering these issues, users can solve some problems
> by
> adding
> or
> adjusting parameters. However, multiple memory config options
> should
> be
> changed. The config option adjustment requires understanding
> the
> detailed
> internal implementation, which is impractical for most users.
>
> To simplify network memory configurations for TaskManager and
> improve
> Flink
> usability, this FLIP proposed some optimization solutions for
> the
> issues.
>
> Looking forward to your feedback.
>
> [1]
>
>
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>
> Best regards,
> Yuxin
>
>
>
>
>
>
>
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by JasonLee <17...@163.com>.
Hi Yuxin


Thanks for the proposal, big + 1 for this FLIP.



It is difficult for users to calculate the size of network memory. If the setting is too small, the task cannot be started. If the setting is too large, there may be a waste of resources. As far as possible, Flink framework can automatically set a reasonable value, but I have a small problem. network memory is not only related to the parallelism of the task, but also to the complexity of the task DAG. The more complex a DAG is, shuffle write and shuffle read require larger buffers. How can we determine how many RS and IG a DAG has?



Best
JasonLee


---- Replied Message ----
| From | Yuxin Tan<ta...@gmail.com> |
| Date | 12/28/2022 18:29 |
| To | <de...@flink.apache.org> |
| Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager |
Hi, Roman

Thanks for the replay.

ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
configurations, which are not calculated. I have described them in the FLIP
motivation section.

3. Each gate requires at least one buffer...
The timeout exception occurs when the ExclusiveBuffersPerChannel
can not be requested from NetworkBufferPool, which is not caused by the
change of this Flip. In addition, if  we have set the
ExclusiveBuffersPerChannel
to 0 when using floating buffers, which can also decrease the probability
of
this exception.

4. It would be great to have experimental results for jobs with different
exchange types.
Thanks for the suggestion. I have a test about different exchange types,
forward
and rescale, and the results show no differences from the all-to-all type,
which
is also understandable, because the network memory usage is calculated
with numChannels, independent of the edge type.

Best,
Yuxin


Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:

Hi everyone,

Thanks for the proposal and the discussion.

I couldn't find much details on how exactly the values of
ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
I guess that
- the threshold evaluation is done on JM
- floating buffers calculation is done on TM based on the current memory
available; so it is not taking into account any future tasks submitted for
that (or other) job
Is that correct?

If so, I see the following potential issues:

1. Each (sub)task might have different values because the actual
available memory might be different. E.g. some tasks might use exclusive
buffers and others only floating. That could lead to significant skew
in processing speed, and in turn to issues with checkpoints and watermarks.

2. Re-deployment of a task (e.g. on job failure) might lead to a completely
different memory configuration. That, coupled with different values per
subtask and operator, makes the performance analysis more difficult.

(Regardless of whether it's done on TM or JM):
3. Each gate requires at least one buffer [1]. So, in case when no memory
is available, TM will throw an Allocation timeout exception instead of
Insufficient buffers exception immediately. A delay here (allocation
timeout) seems like a regression.
Besides that, the regression depends on how much memory is actually
available and how much it is contended, doesn't it?
Should there still be a lower threshold of available memory, below which
the job (task) isn't accepted?
4. The same threshold for all types of shuffles will likely result in using
exclusive buffers
for point-wise connections and floating buffers for all-to-all ones. I'm
not sure if that's always optimal. It would be great to have experimental
results for jobs with different exchange types, WDYT?

[1]
https://issues.apache.org/jira/browse/FLINK-24035

Regards,
Roman


On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com> wrote:

Hi, Weihua

Thanks for your suggestions.

1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
total buffer is not enough?

I think it's a good idea. Will try and check the results in PoC. Before
all
read buffers use floating buffers, I will try to use
(ExclusiveBuffersPerChannel - i)
buffers per channel first. For example, if the user has configured
ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
all channels is 1 and all read buffers are insufficient, all read buffers
will use floating buffers.
If the test results prove better, the FLIP will use this method.

2. Do we really need to change the default value of
'taskmanager.memory.network.max'?

Changing taskmanager.memory.network.max will indeed affect some
users, but the user only is affected when the 3 conditions are fulfilled.
1) Flink total TM memory is larger than 10g (because the network memory
ratio is 0.1).
2) taskmanager.memory.network.max was not initially configured.
3) Other memory, such as managed memory or heap memory, is insufficient.
I think the number of jobs fulfilling the conditions is small because
when
TM
uses such a large amount of memory, the network memory requirement may
also be large. And when encountering the issue, the rollback method is
very
simple,
configuring taskmanager.memory.network.max as 1g or other values.
In addition, the reason for modifying the default value is to simplify
the
network
configurations in most scenarios. This change does affect a few usage
scenarios,
but we should admit that setting the default to any value may not meet
the requirements of all scenarios.

Best,
Yuxin


Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:

Hi Yuxin,
Thanks for the proposal.

"Insufficient number of network buffers" exceptions also bother us.
It's
too hard for users to figure out
how much network buffer they really need. It relates to partitioner
type,
parallelism, slots per taskmanager.

Since streaming jobs are our primary scenario, I have some questions
about
streaming jobs.

1. In this FLIP, all read buffers will use floating buffers when the
total
buffer is more than
'taskmanager.memory.network.read-required-buffer.max'. Competition in
buffer allocation led to preference regression.
How about reducing ExclusiveBuffersPerChannel to 1 first when the total
buffer is not enough?
Will this reduce performance regression in streaming?

2. Changing taskmanager.memory.network.max will affect user migration
from
the lower version.
IMO, network buffer size should not increase with total memory,
especially
for streaming jobs with application mode.
For example, some ETL jobs with rescale partitioner only require a few
network buffers.
And we already have
'taskmanager.memory.network.read-required-buffer.max'
to control maximum read network buffer usage.
Do we really need to change the default value of
'taskmanager.memory.network.max'?

Best,
Weihua


On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
wrote:

Hi, all
Thanks for the reply and feedback for everyone!


After combining everyone's comments, the main concerns, and
corresponding
adjustments are as follows.


@Guowei Ma, Thanks for your feedback.
should we introduce a _new_ non-orthogonal
option(`taskmanager.memory.network.required-buffer-per-gate.max`).
That
is
to say, the option will affect both streaming and batch shuffle
behavior
at
the
same time.

1. Because the default option can meet most requirements no matter in
Streaming
or Batch scenarios. We do not want users to adjust this default
config
option by
design. This configuration option is added only to preserve the
possibility
of
modification options for users.
2. In a few cases, if you really want to adjust this option, users
may
not
expect to
adjust the option according to Streaming or Batch, for example,
according
to the
parallelism of the job.
3. Regarding the performance of streaming shuffle, the same problem
of
insufficient memory also exists for Streaming jobs. We introduced
this
configuration
to enable users to decouple memory and parallelism, but it will
affect
some
performance. By default, the feature is disabled and does not affect
performance.
However, the added configuration enables users to choose to decouple
memory
usage and parallelism for Streaming jobs.

It's better not to expose more implementation-related concepts to
users.

Thanks for you suggestion. I will modify the option name to avoid
exposing
implementation-related concepts. I have changed it to
`taskmanager.memory.network.read-required-buffer.max` in the FLIP.



@Dong Lin, Thanks for your reply.
it might be helpful to add a dedicated public interface section to
describe
the config key and config semantics.

Thanks for your suggestion. I have added public interface section to
describe
the config key and config semantics clearly.

This FLIP seems to add more configs without removing any config
from
Flink.

This Flip is to reduce the number of options to be adjusted when
using
Flink.
After the Flip, the default option can meet the requirements in most
sceneries
rather than modifying any config
options(`taskmanager.network.memory.buffers-per-channel`
and `taskmanager.network.memory.floating-buffers-per-gate`), which is
helpful
to improve the out-of-box usability. In the long run, these two
parameters
`taskmanager.network.memory.buffers-per-channel` and
`taskmanager.network.memory.floating-buffers-per-gate` may indeed be
deprecated
to reduce user parameters, but from the perspective of compatibility,
we
need to
pay attention to users' feedback before deciding to deprecate the
options.



@Yanfei Lei,Thanks for your feedback.
1. Through the option is cluster level, the default value is
different
according to the
job type. In other words, by default, for Batch jobs, the config
value
is
enabled, 1000.
And for Streaming jobs, the config value is not enabled by default.

2. I think this is a good point. The total floating buffers will not
change
with



ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
because this is the maximum memory threshold. But if the user
explicitly
specified
the ExclusiveBuffersPerChannel, the calculated result of
ExclusiveBuffersPerChannel * numChannels will change with it.


Thanks again for all feedback!


Best,
Yuxin


Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:

Hi Yuxin,

Thanks for creating this FLIP.

It's good if Flink does not require users to set a very large
network
memory, or tune the advanced(hard-to-understand)
per-channel/per-gate
buffer configs, to avoid "Insufficient number of network buffers"
exceptions
which can easily happen for large scale jobs.

Regarding the new config
"taskmanager.memory.network.read-required-buffer.max",
I think it's still an advanced config which users may feel hard to
tune.
However, given that in most cases users will not need to set it, I
think it's acceptable.

So +1 for this FLIP.

In the future, I think Flink should adaptively select to use
exclusive
buffers
or not according to whether there are sufficient network buffers at
runtime.
Users then no longer need to understand the above configuration.
This
may
require supporting transitions between exclusive buffers and
floating
buffers.
A problem of all buffer floating is that too few network buffers
can
result
in task slowness which is hard to identify by users. So it's also
needed
to
do improvements on metrics and web UI to expose such issues.

Thanks,
Zhu

Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:

Hi Yuxin,

Thanks for the proposal!

After reading the FLIP, I have some questions about the default
value.
This FLIP seems to introduce a *new* config
option(taskmanager.memory.network.required-buffer-per-gate.max)
to
control
the network memory usage.
1. Is this configuration at the job level or cluster level? As
the
FLIP
described, the default values of the Batch job and Stream job are
different, If an explicit value is set for cluster level, will it
affect
all Batch jobs and Stream jobs on the cluster?

2. The default value of Batch Job depends on the value of





ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
if the value of ExclusiveBuffersPerChannel changed, does
"taskmanager.memory.network.required-buffer-per-gate.max" need to
change
with it?


Best,
Yanfei

Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:

Hi Yuxin,

Thanks for proposing the FLIP!

The motivation section makes sense. But it seems that the
proposed
change
section mixes the proposed config with the evaluation results.
It
is
a
bit
hard to understand what configs are proposed and how to
describe
these
configs to users. Given that the configuration setting is part
of
public
interfaces, it might be helpful to add a dedicated public
interface
section
to describe the config key and config semantics, as suggested
in
the
FLIP
template here
<





https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

.

This FLIP seems to add more configs without removing any config
from
Flink.
Intuitively this can make the Flink configuration harder rather
than
simpler. Maybe we can get a better idea after we add a public
interface
section to clarify those configs.

Thanks,
Dong


On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
tanyuxinwork@gmail.com>
wrote:

Hi, devs,

I'd like to start a discussion about FLIP-266: Simplify
network
memory
configurations for TaskManager[1].

When using Flink, users may encounter the following issues
that
affect
usability.
1. The job may fail with an "Insufficient number of network
buffers"
exception.
2. Flink network memory size adjustment is complex.
When encountering these issues, users can solve some problems
by
adding
or
adjusting parameters. However, multiple memory config options
should
be
changed. The config option adjustment requires understanding
the
detailed
internal implementation, which is impractical for most users.

To simplify network memory configurations for TaskManager and
improve
Flink
usability, this FLIP proposed some optimization solutions for
the
issues.

Looking forward to your feedback.

[1]







https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager

Best regards,
Yuxin








Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Hi, Roman

Thanks for the replay.

ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
configurations, which are not calculated. I have described them in the FLIP
motivation section.

> 3. Each gate requires at least one buffer...
The timeout exception occurs when the ExclusiveBuffersPerChannel
can not be requested from NetworkBufferPool, which is not caused by the
change of this Flip. In addition, if  we have set the
ExclusiveBuffersPerChannel
to 0 when using floating buffers, which can also decrease the probability
of
this exception.

> 4. It would be great to have experimental results for jobs with different
exchange types.
Thanks for the suggestion. I have a test about different exchange types,
forward
and rescale, and the results show no differences from the all-to-all type,
which
is also understandable, because the network memory usage is calculated
with numChannels, independent of the edge type.

Best,
Yuxin


Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:

> Hi everyone,
>
> Thanks for the proposal and the discussion.
>
> I couldn't find much details on how exactly the values of
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> I guess that
> - the threshold evaluation is done on JM
> - floating buffers calculation is done on TM based on the current memory
> available; so it is not taking into account any future tasks submitted for
> that (or other) job
> Is that correct?
>
> If so, I see the following potential issues:
>
> 1. Each (sub)task might have different values because the actual
> available memory might be different. E.g. some tasks might use exclusive
> buffers and others only floating. That could lead to significant skew
> in processing speed, and in turn to issues with checkpoints and watermarks.
>
> 2. Re-deployment of a task (e.g. on job failure) might lead to a completely
> different memory configuration. That, coupled with different values per
> subtask and operator, makes the performance analysis more difficult.
>
> (Regardless of whether it's done on TM or JM):
> 3. Each gate requires at least one buffer [1]. So, in case when no memory
> is available, TM will throw an Allocation timeout exception instead of
> Insufficient buffers exception immediately. A delay here (allocation
> timeout) seems like a regression.
> Besides that, the regression depends on how much memory is actually
> available and how much it is contended, doesn't it?
> Should there still be a lower threshold of available memory, below which
> the job (task) isn't accepted?
> 4. The same threshold for all types of shuffles will likely result in using
> exclusive buffers
> for point-wise connections and floating buffers for all-to-all ones. I'm
> not sure if that's always optimal. It would be great to have experimental
> results for jobs with different exchange types, WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-24035
>
> Regards,
> Roman
>
>
> On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com> wrote:
>
> > Hi, Weihua
> >
> > Thanks for your suggestions.
> >
> > > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> > total buffer is not enough?
> >
> > I think it's a good idea. Will try and check the results in PoC. Before
> all
> > read buffers use floating buffers, I will try to use
> > (ExclusiveBuffersPerChannel - i)
> > buffers per channel first. For example, if the user has configured
> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> > all channels is 1 and all read buffers are insufficient, all read buffers
> > will use floating buffers.
> > If the test results prove better, the FLIP will use this method.
> >
> > > 2. Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Changing taskmanager.memory.network.max will indeed affect some
> > users, but the user only is affected when the 3 conditions are fulfilled.
> > 1) Flink total TM memory is larger than 10g (because the network memory
> > ratio is 0.1).
> > 2) taskmanager.memory.network.max was not initially configured.
> > 3) Other memory, such as managed memory or heap memory, is insufficient.
> > I think the number of jobs fulfilling the conditions is small because
> when
> > TM
> > uses such a large amount of memory, the network memory requirement may
> > also be large. And when encountering the issue, the rollback method is
> very
> > simple,
> > configuring taskmanager.memory.network.max as 1g or other values.
> > In addition, the reason for modifying the default value is to simplify
> the
> > network
> > configurations in most scenarios. This change does affect a few usage
> > scenarios,
> > but we should admit that setting the default to any value may not meet
> > the requirements of all scenarios.
> >
> > Best,
> > Yuxin
> >
> >
> > Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
> >
> > > Hi Yuxin,
> > > Thanks for the proposal.
> > >
> > > "Insufficient number of network buffers" exceptions also bother us.
> It's
> > > too hard for users to figure out
> > > how much network buffer they really need. It relates to partitioner
> type,
> > > parallelism, slots per taskmanager.
> > >
> > > Since streaming jobs are our primary scenario, I have some questions
> > about
> > > streaming jobs.
> > >
> > > 1. In this FLIP, all read buffers will use floating buffers when the
> > total
> > > buffer is more than
> > > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> > > buffer allocation led to preference regression.
> > > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> > > buffer is not enough?
> > > Will this reduce performance regression in streaming?
> > >
> > > 2. Changing taskmanager.memory.network.max will affect user migration
> > from
> > > the lower version.
> > > IMO, network buffer size should not increase with total memory,
> > especially
> > > for streaming jobs with application mode.
> > > For example, some ETL jobs with rescale partitioner only require a few
> > > network buffers.
> > > And we already have
> 'taskmanager.memory.network.read-required-buffer.max'
> > > to control maximum read network buffer usage.
> > > Do we really need to change the default value of
> > > 'taskmanager.memory.network.max'?
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
> > wrote:
> > >
> > > > Hi, all
> > > > Thanks for the reply and feedback for everyone!
> > > >
> > > >
> > > > After combining everyone's comments, the main concerns, and
> > corresponding
> > > > adjustments are as follows.
> > > >
> > > >
> > > > @Guowei Ma, Thanks for your feedback.
> > > > > should we introduce a _new_ non-orthogonal
> > > > option(`taskmanager.memory.network.required-buffer-per-gate.max`).
> That
> > > is
> > > > to say, the option will affect both streaming and batch shuffle
> > behavior
> > > at
> > > > the
> > > > same time.
> > > >
> > > > 1. Because the default option can meet most requirements no matter in
> > > > Streaming
> > > > or Batch scenarios. We do not want users to adjust this default
> config
> > > > option by
> > > > design. This configuration option is added only to preserve the
> > > possibility
> > > > of
> > > > modification options for users.
> > > > 2. In a few cases, if you really want to adjust this option, users
> may
> > > not
> > > > expect to
> > > > adjust the option according to Streaming or Batch, for example,
> > according
> > > > to the
> > > > parallelism of the job.
> > > > 3. Regarding the performance of streaming shuffle, the same problem
> of
> > > > insufficient memory also exists for Streaming jobs. We introduced
> this
> > > > configuration
> > > > to enable users to decouple memory and parallelism, but it will
> affect
> > > some
> > > > performance. By default, the feature is disabled and does not affect
> > > > performance.
> > > > However, the added configuration enables users to choose to decouple
> > > memory
> > > > usage and parallelism for Streaming jobs.
> > > >
> > > > > It's better not to expose more implementation-related concepts to
> > > users.
> > > >
> > > > Thanks for you suggestion. I will modify the option name to avoid
> > > exposing
> > > > implementation-related concepts. I have changed it to
> > > > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
> > > >
> > > >
> > > >
> > > > @Dong Lin, Thanks for your reply.
> > > > >  it might be helpful to add a dedicated public interface section to
> > > > describe
> > > > the config key and config semantics.
> > > >
> > > > Thanks for your suggestion. I have added public interface section to
> > > > describe
> > > > the config key and config semantics clearly.
> > > >
> > > > > This FLIP seems to add more configs without removing any config
> from
> > > > Flink.
> > > >
> > > > This Flip is to reduce the number of options to be adjusted when
> using
> > > > Flink.
> > > > After the Flip, the default option can meet the requirements in most
> > > > sceneries
> > > > rather than modifying any config
> > > > options(`taskmanager.network.memory.buffers-per-channel`
> > > > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> > > > helpful
> > > > to improve the out-of-box usability. In the long run, these two
> > > parameters
> > > > `taskmanager.network.memory.buffers-per-channel` and
> > > > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> > > > deprecated
> > > > to reduce user parameters, but from the perspective of compatibility,
> > we
> > > > need to
> > > > pay attention to users' feedback before deciding to deprecate the
> > > options.
> > > >
> > > >
> > > >
> > > > @Yanfei Lei,Thanks for your feedback.
> > > > 1. Through the option is cluster level, the default value is
> different
> > > > according to the
> > > > job type. In other words, by default, for Batch jobs, the config
> value
> > is
> > > > enabled, 1000.
> > > > And for Streaming jobs, the config value is not enabled by default.
> > > >
> > > > 2. I think this is a good point. The total floating buffers will not
> > > change
> > > > with
> > > >
> > >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> > > > because this is the maximum memory threshold. But if the user
> > explicitly
> > > > specified
> > > > the ExclusiveBuffersPerChannel, the calculated result of
> > > > ExclusiveBuffersPerChannel * numChannels will change with it.
> > > >
> > > >
> > > > Thanks again for all feedback!
> > > >
> > > >
> > > > Best,
> > > > Yuxin
> > > >
> > > >
> > > > Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
> > > >
> > > > > Hi Yuxin,
> > > > >
> > > > > Thanks for creating this FLIP.
> > > > >
> > > > > It's good if Flink does not require users to set a very large
> network
> > > > > memory, or tune the advanced(hard-to-understand)
> per-channel/per-gate
> > > > > buffer configs, to avoid "Insufficient number of network buffers"
> > > > > exceptions
> > > > > which can easily happen for large scale jobs.
> > > > >
> > > > > Regarding the new config
> > > > > "taskmanager.memory.network.read-required-buffer.max",
> > > > > I think it's still an advanced config which users may feel hard to
> > > tune.
> > > > > However, given that in most cases users will not need to set it, I
> > > > > think it's acceptable.
> > > > >
> > > > > So +1 for this FLIP.
> > > > >
> > > > > In the future, I think Flink should adaptively select to use
> > exclusive
> > > > > buffers
> > > > > or not according to whether there are sufficient network buffers at
> > > > > runtime.
> > > > > Users then no longer need to understand the above configuration.
> This
> > > may
> > > > > require supporting transitions between exclusive buffers and
> floating
> > > > > buffers.
> > > > > A problem of all buffer floating is that too few network buffers
> can
> > > > result
> > > > > in task slowness which is hard to identify by users. So it's also
> > > needed
> > > > to
> > > > > do improvements on metrics and web UI to expose such issues.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
> > > > > >
> > > > > > Hi Yuxin,
> > > > > >
> > > > > > Thanks for the proposal!
> > > > > >
> > > > > > After reading the FLIP, I have some questions about the default
> > > value.
> > > > > > This FLIP seems to introduce a *new* config
> > > > > > option(taskmanager.memory.network.required-buffer-per-gate.max)
> to
> > > > > control
> > > > > > the network memory usage.
> > > > > > 1. Is this configuration at the job level or cluster level? As
> the
> > > FLIP
> > > > > > described, the default values of the Batch job and Stream job are
> > > > > > different, If an explicit value is set for cluster level, will it
> > > > affect
> > > > > > all Batch jobs and Stream jobs on the cluster?
> > > > > >
> > > > > > 2. The default value of Batch Job depends on the value of
> > > > > >
> > > > >
> > > >
> > >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > > > > > if the value of ExclusiveBuffersPerChannel changed, does
> > > > > > "taskmanager.memory.network.required-buffer-per-gate.max" need to
> > > > change
> > > > > > with it?
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yanfei
> > > > > >
> > > > > > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
> > > > > >
> > > > > > > Hi Yuxin,
> > > > > > >
> > > > > > > Thanks for proposing the FLIP!
> > > > > > >
> > > > > > > The motivation section makes sense. But it seems that the
> > proposed
> > > > > change
> > > > > > > section mixes the proposed config with the evaluation results.
> It
> > > is
> > > > a
> > > > > bit
> > > > > > > hard to understand what configs are proposed and how to
> describe
> > > > these
> > > > > > > configs to users. Given that the configuration setting is part
> of
> > > > > public
> > > > > > > interfaces, it might be helpful to add a dedicated public
> > interface
> > > > > section
> > > > > > > to describe the config key and config semantics, as suggested
> in
> > > the
> > > > > FLIP
> > > > > > > template here
> > > > > > > <
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > > > >
> > > > > > > .
> > > > > > >
> > > > > > > This FLIP seems to add more configs without removing any config
> > > from
> > > > > Flink.
> > > > > > > Intuitively this can make the Flink configuration harder rather
> > > than
> > > > > > > simpler. Maybe we can get a better idea after we add a public
> > > > interface
> > > > > > > section to clarify those configs.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> > tanyuxinwork@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi, devs,
> > > > > > > >
> > > > > > > > I'd like to start a discussion about FLIP-266: Simplify
> network
> > > > > memory
> > > > > > > > configurations for TaskManager[1].
> > > > > > > >
> > > > > > > > When using Flink, users may encounter the following issues
> that
> > > > > affect
> > > > > > > > usability.
> > > > > > > > 1. The job may fail with an "Insufficient number of network
> > > > buffers"
> > > > > > > > exception.
> > > > > > > > 2. Flink network memory size adjustment is complex.
> > > > > > > > When encountering these issues, users can solve some problems
> > by
> > > > > adding
> > > > > > > or
> > > > > > > > adjusting parameters. However, multiple memory config options
> > > > should
> > > > > be
> > > > > > > > changed. The config option adjustment requires understanding
> > the
> > > > > detailed
> > > > > > > > internal implementation, which is impractical for most users.
> > > > > > > >
> > > > > > > > To simplify network memory configurations for TaskManager and
> > > > improve
> > > > > > > Flink
> > > > > > > > usability, this FLIP proposed some optimization solutions for
> > the
> > > > > issues.
> > > > > > > >
> > > > > > > > Looking forward to your feedback.
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Yuxin
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Roman Khachatryan <ro...@apache.org>.
Hi everyone,

Thanks for the proposal and the discussion.

I couldn't find much details on how exactly the values of
ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
I guess that
- the threshold evaluation is done on JM
- floating buffers calculation is done on TM based on the current memory
available; so it is not taking into account any future tasks submitted for
that (or other) job
Is that correct?

If so, I see the following potential issues:

1. Each (sub)task might have different values because the actual
available memory might be different. E.g. some tasks might use exclusive
buffers and others only floating. That could lead to significant skew
in processing speed, and in turn to issues with checkpoints and watermarks.

2. Re-deployment of a task (e.g. on job failure) might lead to a completely
different memory configuration. That, coupled with different values per
subtask and operator, makes the performance analysis more difficult.

(Regardless of whether it's done on TM or JM):
3. Each gate requires at least one buffer [1]. So, in case when no memory
is available, TM will throw an Allocation timeout exception instead of
Insufficient buffers exception immediately. A delay here (allocation
timeout) seems like a regression.
Besides that, the regression depends on how much memory is actually
available and how much it is contended, doesn't it?
Should there still be a lower threshold of available memory, below which
the job (task) isn't accepted?
4. The same threshold for all types of shuffles will likely result in using
exclusive buffers
for point-wise connections and floating buffers for all-to-all ones. I'm
not sure if that's always optimal. It would be great to have experimental
results for jobs with different exchange types, WDYT?

[1]
https://issues.apache.org/jira/browse/FLINK-24035

Regards,
Roman


On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <ta...@gmail.com> wrote:

> Hi, Weihua
>
> Thanks for your suggestions.
>
> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> total buffer is not enough?
>
> I think it's a good idea. Will try and check the results in PoC. Before all
> read buffers use floating buffers, I will try to use
> (ExclusiveBuffersPerChannel - i)
> buffers per channel first. For example, if the user has configured
> ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> all channels is 1 and all read buffers are insufficient, all read buffers
> will use floating buffers.
> If the test results prove better, the FLIP will use this method.
>
> > 2. Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Changing taskmanager.memory.network.max will indeed affect some
> users, but the user only is affected when the 3 conditions are fulfilled.
> 1) Flink total TM memory is larger than 10g (because the network memory
> ratio is 0.1).
> 2) taskmanager.memory.network.max was not initially configured.
> 3) Other memory, such as managed memory or heap memory, is insufficient.
> I think the number of jobs fulfilling the conditions is small because when
> TM
> uses such a large amount of memory, the network memory requirement may
> also be large. And when encountering the issue, the rollback method is very
> simple,
> configuring taskmanager.memory.network.max as 1g or other values.
> In addition, the reason for modifying the default value is to simplify the
> network
> configurations in most scenarios. This change does affect a few usage
> scenarios,
> but we should admit that setting the default to any value may not meet
> the requirements of all scenarios.
>
> Best,
> Yuxin
>
>
> Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:
>
> > Hi Yuxin,
> > Thanks for the proposal.
> >
> > "Insufficient number of network buffers" exceptions also bother us. It's
> > too hard for users to figure out
> > how much network buffer they really need. It relates to partitioner type,
> > parallelism, slots per taskmanager.
> >
> > Since streaming jobs are our primary scenario, I have some questions
> about
> > streaming jobs.
> >
> > 1. In this FLIP, all read buffers will use floating buffers when the
> total
> > buffer is more than
> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> > buffer allocation led to preference regression.
> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> > buffer is not enough?
> > Will this reduce performance regression in streaming?
> >
> > 2. Changing taskmanager.memory.network.max will affect user migration
> from
> > the lower version.
> > IMO, network buffer size should not increase with total memory,
> especially
> > for streaming jobs with application mode.
> > For example, some ETL jobs with rescale partitioner only require a few
> > network buffers.
> > And we already have 'taskmanager.memory.network.read-required-buffer.max'
> > to control maximum read network buffer usage.
> > Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com>
> wrote:
> >
> > > Hi, all
> > > Thanks for the reply and feedback for everyone!
> > >
> > >
> > > After combining everyone's comments, the main concerns, and
> corresponding
> > > adjustments are as follows.
> > >
> > >
> > > @Guowei Ma, Thanks for your feedback.
> > > > should we introduce a _new_ non-orthogonal
> > > option(`taskmanager.memory.network.required-buffer-per-gate.max`). That
> > is
> > > to say, the option will affect both streaming and batch shuffle
> behavior
> > at
> > > the
> > > same time.
> > >
> > > 1. Because the default option can meet most requirements no matter in
> > > Streaming
> > > or Batch scenarios. We do not want users to adjust this default config
> > > option by
> > > design. This configuration option is added only to preserve the
> > possibility
> > > of
> > > modification options for users.
> > > 2. In a few cases, if you really want to adjust this option, users may
> > not
> > > expect to
> > > adjust the option according to Streaming or Batch, for example,
> according
> > > to the
> > > parallelism of the job.
> > > 3. Regarding the performance of streaming shuffle, the same problem of
> > > insufficient memory also exists for Streaming jobs. We introduced this
> > > configuration
> > > to enable users to decouple memory and parallelism, but it will affect
> > some
> > > performance. By default, the feature is disabled and does not affect
> > > performance.
> > > However, the added configuration enables users to choose to decouple
> > memory
> > > usage and parallelism for Streaming jobs.
> > >
> > > > It's better not to expose more implementation-related concepts to
> > users.
> > >
> > > Thanks for you suggestion. I will modify the option name to avoid
> > exposing
> > > implementation-related concepts. I have changed it to
> > > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
> > >
> > >
> > >
> > > @Dong Lin, Thanks for your reply.
> > > >  it might be helpful to add a dedicated public interface section to
> > > describe
> > > the config key and config semantics.
> > >
> > > Thanks for your suggestion. I have added public interface section to
> > > describe
> > > the config key and config semantics clearly.
> > >
> > > > This FLIP seems to add more configs without removing any config from
> > > Flink.
> > >
> > > This Flip is to reduce the number of options to be adjusted when using
> > > Flink.
> > > After the Flip, the default option can meet the requirements in most
> > > sceneries
> > > rather than modifying any config
> > > options(`taskmanager.network.memory.buffers-per-channel`
> > > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> > > helpful
> > > to improve the out-of-box usability. In the long run, these two
> > parameters
> > > `taskmanager.network.memory.buffers-per-channel` and
> > > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> > > deprecated
> > > to reduce user parameters, but from the perspective of compatibility,
> we
> > > need to
> > > pay attention to users' feedback before deciding to deprecate the
> > options.
> > >
> > >
> > >
> > > @Yanfei Lei,Thanks for your feedback.
> > > 1. Through the option is cluster level, the default value is different
> > > according to the
> > > job type. In other words, by default, for Batch jobs, the config value
> is
> > > enabled, 1000.
> > > And for Streaming jobs, the config value is not enabled by default.
> > >
> > > 2. I think this is a good point. The total floating buffers will not
> > change
> > > with
> > >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> > > because this is the maximum memory threshold. But if the user
> explicitly
> > > specified
> > > the ExclusiveBuffersPerChannel, the calculated result of
> > > ExclusiveBuffersPerChannel * numChannels will change with it.
> > >
> > >
> > > Thanks again for all feedback!
> > >
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
> > >
> > > > Hi Yuxin,
> > > >
> > > > Thanks for creating this FLIP.
> > > >
> > > > It's good if Flink does not require users to set a very large network
> > > > memory, or tune the advanced(hard-to-understand) per-channel/per-gate
> > > > buffer configs, to avoid "Insufficient number of network buffers"
> > > > exceptions
> > > > which can easily happen for large scale jobs.
> > > >
> > > > Regarding the new config
> > > > "taskmanager.memory.network.read-required-buffer.max",
> > > > I think it's still an advanced config which users may feel hard to
> > tune.
> > > > However, given that in most cases users will not need to set it, I
> > > > think it's acceptable.
> > > >
> > > > So +1 for this FLIP.
> > > >
> > > > In the future, I think Flink should adaptively select to use
> exclusive
> > > > buffers
> > > > or not according to whether there are sufficient network buffers at
> > > > runtime.
> > > > Users then no longer need to understand the above configuration. This
> > may
> > > > require supporting transitions between exclusive buffers and floating
> > > > buffers.
> > > > A problem of all buffer floating is that too few network buffers can
> > > result
> > > > in task slowness which is hard to identify by users. So it's also
> > needed
> > > to
> > > > do improvements on metrics and web UI to expose such issues.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
> > > > >
> > > > > Hi Yuxin,
> > > > >
> > > > > Thanks for the proposal!
> > > > >
> > > > > After reading the FLIP, I have some questions about the default
> > value.
> > > > > This FLIP seems to introduce a *new* config
> > > > > option(taskmanager.memory.network.required-buffer-per-gate.max) to
> > > > control
> > > > > the network memory usage.
> > > > > 1. Is this configuration at the job level or cluster level? As the
> > FLIP
> > > > > described, the default values of the Batch job and Stream job are
> > > > > different, If an explicit value is set for cluster level, will it
> > > affect
> > > > > all Batch jobs and Stream jobs on the cluster?
> > > > >
> > > > > 2. The default value of Batch Job depends on the value of
> > > > >
> > > >
> > >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > > > > if the value of ExclusiveBuffersPerChannel changed, does
> > > > > "taskmanager.memory.network.required-buffer-per-gate.max" need to
> > > change
> > > > > with it?
> > > > >
> > > > >
> > > > > Best,
> > > > > Yanfei
> > > > >
> > > > > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
> > > > >
> > > > > > Hi Yuxin,
> > > > > >
> > > > > > Thanks for proposing the FLIP!
> > > > > >
> > > > > > The motivation section makes sense. But it seems that the
> proposed
> > > > change
> > > > > > section mixes the proposed config with the evaluation results. It
> > is
> > > a
> > > > bit
> > > > > > hard to understand what configs are proposed and how to describe
> > > these
> > > > > > configs to users. Given that the configuration setting is part of
> > > > public
> > > > > > interfaces, it might be helpful to add a dedicated public
> interface
> > > > section
> > > > > > to describe the config key and config semantics, as suggested in
> > the
> > > > FLIP
> > > > > > template here
> > > > > > <
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > > >
> > > > > > .
> > > > > >
> > > > > > This FLIP seems to add more configs without removing any config
> > from
> > > > Flink.
> > > > > > Intuitively this can make the Flink configuration harder rather
> > than
> > > > > > simpler. Maybe we can get a better idea after we add a public
> > > interface
> > > > > > section to clarify those configs.
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> tanyuxinwork@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi, devs,
> > > > > > >
> > > > > > > I'd like to start a discussion about FLIP-266: Simplify network
> > > > memory
> > > > > > > configurations for TaskManager[1].
> > > > > > >
> > > > > > > When using Flink, users may encounter the following issues that
> > > > affect
> > > > > > > usability.
> > > > > > > 1. The job may fail with an "Insufficient number of network
> > > buffers"
> > > > > > > exception.
> > > > > > > 2. Flink network memory size adjustment is complex.
> > > > > > > When encountering these issues, users can solve some problems
> by
> > > > adding
> > > > > > or
> > > > > > > adjusting parameters. However, multiple memory config options
> > > should
> > > > be
> > > > > > > changed. The config option adjustment requires understanding
> the
> > > > detailed
> > > > > > > internal implementation, which is impractical for most users.
> > > > > > >
> > > > > > > To simplify network memory configurations for TaskManager and
> > > improve
> > > > > > Flink
> > > > > > > usability, this FLIP proposed some optimization solutions for
> the
> > > > issues.
> > > > > > >
> > > > > > > Looking forward to your feedback.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Yuxin
> > > > > > >
> > > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Hi, Weihua

Thanks for your suggestions.

> 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
total buffer is not enough?

I think it's a good idea. Will try and check the results in PoC. Before all
read buffers use floating buffers, I will try to use
(ExclusiveBuffersPerChannel - i)
buffers per channel first. For example, if the user has configured
ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
all channels is 1 and all read buffers are insufficient, all read buffers
will use floating buffers.
If the test results prove better, the FLIP will use this method.

> 2. Do we really need to change the default value of
'taskmanager.memory.network.max'?

Changing taskmanager.memory.network.max will indeed affect some
users, but the user only is affected when the 3 conditions are fulfilled.
1) Flink total TM memory is larger than 10g (because the network memory
ratio is 0.1).
2) taskmanager.memory.network.max was not initially configured.
3) Other memory, such as managed memory or heap memory, is insufficient.
I think the number of jobs fulfilling the conditions is small because when
TM
uses such a large amount of memory, the network memory requirement may
also be large. And when encountering the issue, the rollback method is very
simple,
configuring taskmanager.memory.network.max as 1g or other values.
In addition, the reason for modifying the default value is to simplify the
network
configurations in most scenarios. This change does affect a few usage
scenarios,
but we should admit that setting the default to any value may not meet
the requirements of all scenarios.

Best,
Yuxin


Weihua Hu <hu...@gmail.com> 于2022年12月26日周一 20:35写道:

> Hi Yuxin,
> Thanks for the proposal.
>
> "Insufficient number of network buffers" exceptions also bother us. It's
> too hard for users to figure out
> how much network buffer they really need. It relates to partitioner type,
> parallelism, slots per taskmanager.
>
> Since streaming jobs are our primary scenario, I have some questions about
> streaming jobs.
>
> 1. In this FLIP, all read buffers will use floating buffers when the total
> buffer is more than
> 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> buffer allocation led to preference regression.
> How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> buffer is not enough?
> Will this reduce performance regression in streaming?
>
> 2. Changing taskmanager.memory.network.max will affect user migration from
> the lower version.
> IMO, network buffer size should not increase with total memory, especially
> for streaming jobs with application mode.
> For example, some ETL jobs with rescale partitioner only require a few
> network buffers.
> And we already have 'taskmanager.memory.network.read-required-buffer.max'
> to control maximum read network buffer usage.
> Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Best,
> Weihua
>
>
> On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com> wrote:
>
> > Hi, all
> > Thanks for the reply and feedback for everyone!
> >
> >
> > After combining everyone's comments, the main concerns, and corresponding
> > adjustments are as follows.
> >
> >
> > @Guowei Ma, Thanks for your feedback.
> > > should we introduce a _new_ non-orthogonal
> > option(`taskmanager.memory.network.required-buffer-per-gate.max`). That
> is
> > to say, the option will affect both streaming and batch shuffle behavior
> at
> > the
> > same time.
> >
> > 1. Because the default option can meet most requirements no matter in
> > Streaming
> > or Batch scenarios. We do not want users to adjust this default config
> > option by
> > design. This configuration option is added only to preserve the
> possibility
> > of
> > modification options for users.
> > 2. In a few cases, if you really want to adjust this option, users may
> not
> > expect to
> > adjust the option according to Streaming or Batch, for example, according
> > to the
> > parallelism of the job.
> > 3. Regarding the performance of streaming shuffle, the same problem of
> > insufficient memory also exists for Streaming jobs. We introduced this
> > configuration
> > to enable users to decouple memory and parallelism, but it will affect
> some
> > performance. By default, the feature is disabled and does not affect
> > performance.
> > However, the added configuration enables users to choose to decouple
> memory
> > usage and parallelism for Streaming jobs.
> >
> > > It's better not to expose more implementation-related concepts to
> users.
> >
> > Thanks for you suggestion. I will modify the option name to avoid
> exposing
> > implementation-related concepts. I have changed it to
> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
> >
> >
> >
> > @Dong Lin, Thanks for your reply.
> > >  it might be helpful to add a dedicated public interface section to
> > describe
> > the config key and config semantics.
> >
> > Thanks for your suggestion. I have added public interface section to
> > describe
> > the config key and config semantics clearly.
> >
> > > This FLIP seems to add more configs without removing any config from
> > Flink.
> >
> > This Flip is to reduce the number of options to be adjusted when using
> > Flink.
> > After the Flip, the default option can meet the requirements in most
> > sceneries
> > rather than modifying any config
> > options(`taskmanager.network.memory.buffers-per-channel`
> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> > helpful
> > to improve the out-of-box usability. In the long run, these two
> parameters
> > `taskmanager.network.memory.buffers-per-channel` and
> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> > deprecated
> > to reduce user parameters, but from the perspective of compatibility, we
> > need to
> > pay attention to users' feedback before deciding to deprecate the
> options.
> >
> >
> >
> > @Yanfei Lei,Thanks for your feedback.
> > 1. Through the option is cluster level, the default value is different
> > according to the
> > job type. In other words, by default, for Batch jobs, the config value is
> > enabled, 1000.
> > And for Streaming jobs, the config value is not enabled by default.
> >
> > 2. I think this is a good point. The total floating buffers will not
> change
> > with
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> > because this is the maximum memory threshold. But if the user explicitly
> > specified
> > the ExclusiveBuffersPerChannel, the calculated result of
> > ExclusiveBuffersPerChannel * numChannels will change with it.
> >
> >
> > Thanks again for all feedback!
> >
> >
> > Best,
> > Yuxin
> >
> >
> > Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
> >
> > > Hi Yuxin,
> > >
> > > Thanks for creating this FLIP.
> > >
> > > It's good if Flink does not require users to set a very large network
> > > memory, or tune the advanced(hard-to-understand) per-channel/per-gate
> > > buffer configs, to avoid "Insufficient number of network buffers"
> > > exceptions
> > > which can easily happen for large scale jobs.
> > >
> > > Regarding the new config
> > > "taskmanager.memory.network.read-required-buffer.max",
> > > I think it's still an advanced config which users may feel hard to
> tune.
> > > However, given that in most cases users will not need to set it, I
> > > think it's acceptable.
> > >
> > > So +1 for this FLIP.
> > >
> > > In the future, I think Flink should adaptively select to use exclusive
> > > buffers
> > > or not according to whether there are sufficient network buffers at
> > > runtime.
> > > Users then no longer need to understand the above configuration. This
> may
> > > require supporting transitions between exclusive buffers and floating
> > > buffers.
> > > A problem of all buffer floating is that too few network buffers can
> > result
> > > in task slowness which is hard to identify by users. So it's also
> needed
> > to
> > > do improvements on metrics and web UI to expose such issues.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
> > > >
> > > > Hi Yuxin,
> > > >
> > > > Thanks for the proposal!
> > > >
> > > > After reading the FLIP, I have some questions about the default
> value.
> > > > This FLIP seems to introduce a *new* config
> > > > option(taskmanager.memory.network.required-buffer-per-gate.max) to
> > > control
> > > > the network memory usage.
> > > > 1. Is this configuration at the job level or cluster level? As the
> FLIP
> > > > described, the default values of the Batch job and Stream job are
> > > > different, If an explicit value is set for cluster level, will it
> > affect
> > > > all Batch jobs and Stream jobs on the cluster?
> > > >
> > > > 2. The default value of Batch Job depends on the value of
> > > >
> > >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > > > if the value of ExclusiveBuffersPerChannel changed, does
> > > > "taskmanager.memory.network.required-buffer-per-gate.max" need to
> > change
> > > > with it?
> > > >
> > > >
> > > > Best,
> > > > Yanfei
> > > >
> > > > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
> > > >
> > > > > Hi Yuxin,
> > > > >
> > > > > Thanks for proposing the FLIP!
> > > > >
> > > > > The motivation section makes sense. But it seems that the proposed
> > > change
> > > > > section mixes the proposed config with the evaluation results. It
> is
> > a
> > > bit
> > > > > hard to understand what configs are proposed and how to describe
> > these
> > > > > configs to users. Given that the configuration setting is part of
> > > public
> > > > > interfaces, it might be helpful to add a dedicated public interface
> > > section
> > > > > to describe the config key and config semantics, as suggested in
> the
> > > FLIP
> > > > > template here
> > > > > <
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > >
> > > > > .
> > > > >
> > > > > This FLIP seems to add more configs without removing any config
> from
> > > Flink.
> > > > > Intuitively this can make the Flink configuration harder rather
> than
> > > > > simpler. Maybe we can get a better idea after we add a public
> > interface
> > > > > section to clarify those configs.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi, devs,
> > > > > >
> > > > > > I'd like to start a discussion about FLIP-266: Simplify network
> > > memory
> > > > > > configurations for TaskManager[1].
> > > > > >
> > > > > > When using Flink, users may encounter the following issues that
> > > affect
> > > > > > usability.
> > > > > > 1. The job may fail with an "Insufficient number of network
> > buffers"
> > > > > > exception.
> > > > > > 2. Flink network memory size adjustment is complex.
> > > > > > When encountering these issues, users can solve some problems by
> > > adding
> > > > > or
> > > > > > adjusting parameters. However, multiple memory config options
> > should
> > > be
> > > > > > changed. The config option adjustment requires understanding the
> > > detailed
> > > > > > internal implementation, which is impractical for most users.
> > > > > >
> > > > > > To simplify network memory configurations for TaskManager and
> > improve
> > > > > Flink
> > > > > > usability, this FLIP proposed some optimization solutions for the
> > > issues.
> > > > > >
> > > > > > Looking forward to your feedback.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > > > > >
> > > > > > Best regards,
> > > > > > Yuxin
> > > > > >
> > > > >
> > >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Weihua Hu <hu...@gmail.com>.
Hi Yuxin,
Thanks for the proposal.

"Insufficient number of network buffers" exceptions also bother us. It's
too hard for users to figure out
how much network buffer they really need. It relates to partitioner type,
parallelism, slots per taskmanager.

Since streaming jobs are our primary scenario, I have some questions about
streaming jobs.

1. In this FLIP, all read buffers will use floating buffers when the total
buffer is more than
'taskmanager.memory.network.read-required-buffer.max'. Competition in
buffer allocation led to preference regression.
How about reducing ExclusiveBuffersPerChannel to 1 first when the total
buffer is not enough?
Will this reduce performance regression in streaming?

2. Changing taskmanager.memory.network.max will affect user migration from
the lower version.
IMO, network buffer size should not increase with total memory, especially
for streaming jobs with application mode.
For example, some ETL jobs with rescale partitioner only require a few
network buffers.
And we already have 'taskmanager.memory.network.read-required-buffer.max'
to control maximum read network buffer usage.
Do we really need to change the default value of
'taskmanager.memory.network.max'?

Best,
Weihua


On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <ta...@gmail.com> wrote:

> Hi, all
> Thanks for the reply and feedback for everyone!
>
>
> After combining everyone's comments, the main concerns, and corresponding
> adjustments are as follows.
>
>
> @Guowei Ma, Thanks for your feedback.
> > should we introduce a _new_ non-orthogonal
> option(`taskmanager.memory.network.required-buffer-per-gate.max`). That is
> to say, the option will affect both streaming and batch shuffle behavior at
> the
> same time.
>
> 1. Because the default option can meet most requirements no matter in
> Streaming
> or Batch scenarios. We do not want users to adjust this default config
> option by
> design. This configuration option is added only to preserve the possibility
> of
> modification options for users.
> 2. In a few cases, if you really want to adjust this option, users may not
> expect to
> adjust the option according to Streaming or Batch, for example, according
> to the
> parallelism of the job.
> 3. Regarding the performance of streaming shuffle, the same problem of
> insufficient memory also exists for Streaming jobs. We introduced this
> configuration
> to enable users to decouple memory and parallelism, but it will affect some
> performance. By default, the feature is disabled and does not affect
> performance.
> However, the added configuration enables users to choose to decouple memory
> usage and parallelism for Streaming jobs.
>
> > It's better not to expose more implementation-related concepts to users.
>
> Thanks for you suggestion. I will modify the option name to avoid exposing
> implementation-related concepts. I have changed it to
> `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
>
>
>
> @Dong Lin, Thanks for your reply.
> >  it might be helpful to add a dedicated public interface section to
> describe
> the config key and config semantics.
>
> Thanks for your suggestion. I have added public interface section to
> describe
> the config key and config semantics clearly.
>
> > This FLIP seems to add more configs without removing any config from
> Flink.
>
> This Flip is to reduce the number of options to be adjusted when using
> Flink.
> After the Flip, the default option can meet the requirements in most
> sceneries
> rather than modifying any config
> options(`taskmanager.network.memory.buffers-per-channel`
> and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> helpful
> to improve the out-of-box usability. In the long run, these two parameters
> `taskmanager.network.memory.buffers-per-channel` and
> `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> deprecated
> to reduce user parameters, but from the perspective of compatibility, we
> need to
> pay attention to users' feedback before deciding to deprecate the options.
>
>
>
> @Yanfei Lei,Thanks for your feedback.
> 1. Through the option is cluster level, the default value is different
> according to the
> job type. In other words, by default, for Batch jobs, the config value is
> enabled, 1000.
> And for Streaming jobs, the config value is not enabled by default.
>
> 2. I think this is a good point. The total floating buffers will not change
> with
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> because this is the maximum memory threshold. But if the user explicitly
> specified
> the ExclusiveBuffersPerChannel, the calculated result of
> ExclusiveBuffersPerChannel * numChannels will change with it.
>
>
> Thanks again for all feedback!
>
>
> Best,
> Yuxin
>
>
> Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:
>
> > Hi Yuxin,
> >
> > Thanks for creating this FLIP.
> >
> > It's good if Flink does not require users to set a very large network
> > memory, or tune the advanced(hard-to-understand) per-channel/per-gate
> > buffer configs, to avoid "Insufficient number of network buffers"
> > exceptions
> > which can easily happen for large scale jobs.
> >
> > Regarding the new config
> > "taskmanager.memory.network.read-required-buffer.max",
> > I think it's still an advanced config which users may feel hard to tune.
> > However, given that in most cases users will not need to set it, I
> > think it's acceptable.
> >
> > So +1 for this FLIP.
> >
> > In the future, I think Flink should adaptively select to use exclusive
> > buffers
> > or not according to whether there are sufficient network buffers at
> > runtime.
> > Users then no longer need to understand the above configuration. This may
> > require supporting transitions between exclusive buffers and floating
> > buffers.
> > A problem of all buffer floating is that too few network buffers can
> result
> > in task slowness which is hard to identify by users. So it's also needed
> to
> > do improvements on metrics and web UI to expose such issues.
> >
> > Thanks,
> > Zhu
> >
> > Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
> > >
> > > Hi Yuxin,
> > >
> > > Thanks for the proposal!
> > >
> > > After reading the FLIP, I have some questions about the default value.
> > > This FLIP seems to introduce a *new* config
> > > option(taskmanager.memory.network.required-buffer-per-gate.max) to
> > control
> > > the network memory usage.
> > > 1. Is this configuration at the job level or cluster level? As the FLIP
> > > described, the default values of the Batch job and Stream job are
> > > different, If an explicit value is set for cluster level, will it
> affect
> > > all Batch jobs and Stream jobs on the cluster?
> > >
> > > 2. The default value of Batch Job depends on the value of
> > >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > > if the value of ExclusiveBuffersPerChannel changed, does
> > > "taskmanager.memory.network.required-buffer-per-gate.max" need to
> change
> > > with it?
> > >
> > >
> > > Best,
> > > Yanfei
> > >
> > > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
> > >
> > > > Hi Yuxin,
> > > >
> > > > Thanks for proposing the FLIP!
> > > >
> > > > The motivation section makes sense. But it seems that the proposed
> > change
> > > > section mixes the proposed config with the evaluation results. It is
> a
> > bit
> > > > hard to understand what configs are proposed and how to describe
> these
> > > > configs to users. Given that the configuration setting is part of
> > public
> > > > interfaces, it might be helpful to add a dedicated public interface
> > section
> > > > to describe the config key and config semantics, as suggested in the
> > FLIP
> > > > template here
> > > > <
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > >
> > > > .
> > > >
> > > > This FLIP seems to add more configs without removing any config from
> > Flink.
> > > > Intuitively this can make the Flink configuration harder rather than
> > > > simpler. Maybe we can get a better idea after we add a public
> interface
> > > > section to clarify those configs.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com>
> > wrote:
> > > >
> > > > > Hi, devs,
> > > > >
> > > > > I'd like to start a discussion about FLIP-266: Simplify network
> > memory
> > > > > configurations for TaskManager[1].
> > > > >
> > > > > When using Flink, users may encounter the following issues that
> > affect
> > > > > usability.
> > > > > 1. The job may fail with an "Insufficient number of network
> buffers"
> > > > > exception.
> > > > > 2. Flink network memory size adjustment is complex.
> > > > > When encountering these issues, users can solve some problems by
> > adding
> > > > or
> > > > > adjusting parameters. However, multiple memory config options
> should
> > be
> > > > > changed. The config option adjustment requires understanding the
> > detailed
> > > > > internal implementation, which is impractical for most users.
> > > > >
> > > > > To simplify network memory configurations for TaskManager and
> improve
> > > > Flink
> > > > > usability, this FLIP proposed some optimization solutions for the
> > issues.
> > > > >
> > > > > Looking forward to your feedback.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > > > >
> > > > > Best regards,
> > > > > Yuxin
> > > > >
> > > >
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yuxin Tan <ta...@gmail.com>.
Hi, all
Thanks for the reply and feedback for everyone!


After combining everyone's comments, the main concerns, and corresponding
adjustments are as follows.


@Guowei Ma, Thanks for your feedback.
> should we introduce a _new_ non-orthogonal
option(`taskmanager.memory.network.required-buffer-per-gate.max`). That is
to say, the option will affect both streaming and batch shuffle behavior at
the
same time.

1. Because the default option can meet most requirements no matter in
Streaming
or Batch scenarios. We do not want users to adjust this default config
option by
design. This configuration option is added only to preserve the possibility
of
modification options for users.
2. In a few cases, if you really want to adjust this option, users may not
expect to
adjust the option according to Streaming or Batch, for example, according
to the
parallelism of the job.
3. Regarding the performance of streaming shuffle, the same problem of
insufficient memory also exists for Streaming jobs. We introduced this
configuration
to enable users to decouple memory and parallelism, but it will affect some
performance. By default, the feature is disabled and does not affect
performance.
However, the added configuration enables users to choose to decouple memory
usage and parallelism for Streaming jobs.

> It's better not to expose more implementation-related concepts to users.

Thanks for you suggestion. I will modify the option name to avoid exposing
implementation-related concepts. I have changed it to
`taskmanager.memory.network.read-required-buffer.max` in the FLIP.



@Dong Lin, Thanks for your reply.
>  it might be helpful to add a dedicated public interface section to
describe
the config key and config semantics.

Thanks for your suggestion. I have added public interface section to
describe
the config key and config semantics clearly.

> This FLIP seems to add more configs without removing any config from
Flink.

This Flip is to reduce the number of options to be adjusted when using
Flink.
After the Flip, the default option can meet the requirements in most
sceneries
rather than modifying any config
options(`taskmanager.network.memory.buffers-per-channel`
and `taskmanager.network.memory.floating-buffers-per-gate`), which is
helpful
to improve the out-of-box usability. In the long run, these two parameters
`taskmanager.network.memory.buffers-per-channel` and
`taskmanager.network.memory.floating-buffers-per-gate` may indeed be
deprecated
to reduce user parameters, but from the perspective of compatibility, we
need to
pay attention to users' feedback before deciding to deprecate the options.



@Yanfei Lei,Thanks for your feedback.
1. Through the option is cluster level, the default value is different
according to the
job type. In other words, by default, for Batch jobs, the config value is
enabled, 1000.
And for Streaming jobs, the config value is not enabled by default.

2. I think this is a good point. The total floating buffers will not change
with
ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
because this is the maximum memory threshold. But if the user explicitly
specified
the ExclusiveBuffersPerChannel, the calculated result of
ExclusiveBuffersPerChannel * numChannels will change with it.


Thanks again for all feedback!


Best,
Yuxin


Zhu Zhu <re...@gmail.com> 于2022年12月26日周一 17:18写道:

> Hi Yuxin,
>
> Thanks for creating this FLIP.
>
> It's good if Flink does not require users to set a very large network
> memory, or tune the advanced(hard-to-understand) per-channel/per-gate
> buffer configs, to avoid "Insufficient number of network buffers"
> exceptions
> which can easily happen for large scale jobs.
>
> Regarding the new config
> "taskmanager.memory.network.read-required-buffer.max",
> I think it's still an advanced config which users may feel hard to tune.
> However, given that in most cases users will not need to set it, I
> think it's acceptable.
>
> So +1 for this FLIP.
>
> In the future, I think Flink should adaptively select to use exclusive
> buffers
> or not according to whether there are sufficient network buffers at
> runtime.
> Users then no longer need to understand the above configuration. This may
> require supporting transitions between exclusive buffers and floating
> buffers.
> A problem of all buffer floating is that too few network buffers can result
> in task slowness which is hard to identify by users. So it's also needed to
> do improvements on metrics and web UI to expose such issues.
>
> Thanks,
> Zhu
>
> Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
> >
> > Hi Yuxin,
> >
> > Thanks for the proposal!
> >
> > After reading the FLIP, I have some questions about the default value.
> > This FLIP seems to introduce a *new* config
> > option(taskmanager.memory.network.required-buffer-per-gate.max) to
> control
> > the network memory usage.
> > 1. Is this configuration at the job level or cluster level? As the FLIP
> > described, the default values of the Batch job and Stream job are
> > different, If an explicit value is set for cluster level, will it affect
> > all Batch jobs and Stream jobs on the cluster?
> >
> > 2. The default value of Batch Job depends on the value of
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > if the value of ExclusiveBuffersPerChannel changed, does
> > "taskmanager.memory.network.required-buffer-per-gate.max" need to change
> > with it?
> >
> >
> > Best,
> > Yanfei
> >
> > Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
> >
> > > Hi Yuxin,
> > >
> > > Thanks for proposing the FLIP!
> > >
> > > The motivation section makes sense. But it seems that the proposed
> change
> > > section mixes the proposed config with the evaluation results. It is a
> bit
> > > hard to understand what configs are proposed and how to describe these
> > > configs to users. Given that the configuration setting is part of
> public
> > > interfaces, it might be helpful to add a dedicated public interface
> section
> > > to describe the config key and config semantics, as suggested in the
> FLIP
> > > template here
> > > <
> > >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > >
> > > .
> > >
> > > This FLIP seems to add more configs without removing any config from
> Flink.
> > > Intuitively this can make the Flink configuration harder rather than
> > > simpler. Maybe we can get a better idea after we add a public interface
> > > section to clarify those configs.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com>
> wrote:
> > >
> > > > Hi, devs,
> > > >
> > > > I'd like to start a discussion about FLIP-266: Simplify network
> memory
> > > > configurations for TaskManager[1].
> > > >
> > > > When using Flink, users may encounter the following issues that
> affect
> > > > usability.
> > > > 1. The job may fail with an "Insufficient number of network buffers"
> > > > exception.
> > > > 2. Flink network memory size adjustment is complex.
> > > > When encountering these issues, users can solve some problems by
> adding
> > > or
> > > > adjusting parameters. However, multiple memory config options should
> be
> > > > changed. The config option adjustment requires understanding the
> detailed
> > > > internal implementation, which is impractical for most users.
> > > >
> > > > To simplify network memory configurations for TaskManager and improve
> > > Flink
> > > > usability, this FLIP proposed some optimization solutions for the
> issues.
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > > >
> > > > Best regards,
> > > > Yuxin
> > > >
> > >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Zhu Zhu <re...@gmail.com>.
Hi Yuxin,

Thanks for creating this FLIP.

It's good if Flink does not require users to set a very large network
memory, or tune the advanced(hard-to-understand) per-channel/per-gate
buffer configs, to avoid "Insufficient number of network buffers" exceptions
which can easily happen for large scale jobs.

Regarding the new config "taskmanager.memory.network.read-required-buffer.max",
I think it's still an advanced config which users may feel hard to tune.
However, given that in most cases users will not need to set it, I
think it's acceptable.

So +1 for this FLIP.

In the future, I think Flink should adaptively select to use exclusive buffers
or not according to whether there are sufficient network buffers at runtime.
Users then no longer need to understand the above configuration. This may
require supporting transitions between exclusive buffers and floating buffers.
A problem of all buffer floating is that too few network buffers can result
in task slowness which is hard to identify by users. So it's also needed to
do improvements on metrics and web UI to expose such issues.

Thanks,
Zhu

Yanfei Lei <fr...@gmail.com> 于2022年12月26日周一 11:13写道:
>
> Hi Yuxin,
>
> Thanks for the proposal!
>
> After reading the FLIP, I have some questions about the default value.
> This FLIP seems to introduce a *new* config
> option(taskmanager.memory.network.required-buffer-per-gate.max) to control
> the network memory usage.
> 1. Is this configuration at the job level or cluster level? As the FLIP
> described, the default values of the Batch job and Stream job are
> different, If an explicit value is set for cluster level, will it affect
> all Batch jobs and Stream jobs on the cluster?
>
> 2. The default value of Batch Job depends on the value of
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> if the value of ExclusiveBuffersPerChannel changed, does
> "taskmanager.memory.network.required-buffer-per-gate.max" need to change
> with it?
>
>
> Best,
> Yanfei
>
> Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:
>
> > Hi Yuxin,
> >
> > Thanks for proposing the FLIP!
> >
> > The motivation section makes sense. But it seems that the proposed change
> > section mixes the proposed config with the evaluation results. It is a bit
> > hard to understand what configs are proposed and how to describe these
> > configs to users. Given that the configuration setting is part of public
> > interfaces, it might be helpful to add a dedicated public interface section
> > to describe the config key and config semantics, as suggested in the FLIP
> > template here
> > <
> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > >
> > .
> >
> > This FLIP seems to add more configs without removing any config from Flink.
> > Intuitively this can make the Flink configuration harder rather than
> > simpler. Maybe we can get a better idea after we add a public interface
> > section to clarify those configs.
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com> wrote:
> >
> > > Hi, devs,
> > >
> > > I'd like to start a discussion about FLIP-266: Simplify network memory
> > > configurations for TaskManager[1].
> > >
> > > When using Flink, users may encounter the following issues that affect
> > > usability.
> > > 1. The job may fail with an "Insufficient number of network buffers"
> > > exception.
> > > 2. Flink network memory size adjustment is complex.
> > > When encountering these issues, users can solve some problems by adding
> > or
> > > adjusting parameters. However, multiple memory config options should be
> > > changed. The config option adjustment requires understanding the detailed
> > > internal implementation, which is impractical for most users.
> > >
> > > To simplify network memory configurations for TaskManager and improve
> > Flink
> > > usability, this FLIP proposed some optimization solutions for the issues.
> > >
> > > Looking forward to your feedback.
> > >
> > > [1]
> > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> > >
> > > Best regards,
> > > Yuxin
> > >
> >

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Yanfei Lei <fr...@gmail.com>.
Hi Yuxin,

Thanks for the proposal!

After reading the FLIP, I have some questions about the default value.
This FLIP seems to introduce a *new* config
option(taskmanager.memory.network.required-buffer-per-gate.max) to control
the network memory usage.
1. Is this configuration at the job level or cluster level? As the FLIP
described, the default values of the Batch job and Stream job are
different, If an explicit value is set for cluster level, will it affect
all Batch jobs and Stream jobs on the cluster?

2. The default value of Batch Job depends on the value of
ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
if the value of ExclusiveBuffersPerChannel changed, does
"taskmanager.memory.network.required-buffer-per-gate.max" need to change
with it?


Best,
Yanfei

Dong Lin <li...@gmail.com> 于2022年12月25日周日 08:58写道:

> Hi Yuxin,
>
> Thanks for proposing the FLIP!
>
> The motivation section makes sense. But it seems that the proposed change
> section mixes the proposed config with the evaluation results. It is a bit
> hard to understand what configs are proposed and how to describe these
> configs to users. Given that the configuration setting is part of public
> interfaces, it might be helpful to add a dedicated public interface section
> to describe the config key and config semantics, as suggested in the FLIP
> template here
> <
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> .
>
> This FLIP seems to add more configs without removing any config from Flink.
> Intuitively this can make the Flink configuration harder rather than
> simpler. Maybe we can get a better idea after we add a public interface
> section to clarify those configs.
>
> Thanks,
> Dong
>
>
> On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com> wrote:
>
> > Hi, devs,
> >
> > I'd like to start a discussion about FLIP-266: Simplify network memory
> > configurations for TaskManager[1].
> >
> > When using Flink, users may encounter the following issues that affect
> > usability.
> > 1. The job may fail with an "Insufficient number of network buffers"
> > exception.
> > 2. Flink network memory size adjustment is complex.
> > When encountering these issues, users can solve some problems by adding
> or
> > adjusting parameters. However, multiple memory config options should be
> > changed. The config option adjustment requires understanding the detailed
> > internal implementation, which is impractical for most users.
> >
> > To simplify network memory configurations for TaskManager and improve
> Flink
> > usability, this FLIP proposed some optimization solutions for the issues.
> >
> > Looking forward to your feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> >
> > Best regards,
> > Yuxin
> >
>

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

Posted by Dong Lin <li...@gmail.com>.
Hi Yuxin,

Thanks for proposing the FLIP!

The motivation section makes sense. But it seems that the proposed change
section mixes the proposed config with the evaluation results. It is a bit
hard to understand what configs are proposed and how to describe these
configs to users. Given that the configuration setting is part of public
interfaces, it might be helpful to add a dedicated public interface section
to describe the config key and config semantics, as suggested in the FLIP
template here
<https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals>
.

This FLIP seems to add more configs without removing any config from Flink.
Intuitively this can make the Flink configuration harder rather than
simpler. Maybe we can get a better idea after we add a public interface
section to clarify those configs.

Thanks,
Dong


On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <ta...@gmail.com> wrote:

> Hi, devs,
>
> I'd like to start a discussion about FLIP-266: Simplify network memory
> configurations for TaskManager[1].
>
> When using Flink, users may encounter the following issues that affect
> usability.
> 1. The job may fail with an "Insufficient number of network buffers"
> exception.
> 2. Flink network memory size adjustment is complex.
> When encountering these issues, users can solve some problems by adding or
> adjusting parameters. However, multiple memory config options should be
> changed. The config option adjustment requires understanding the detailed
> internal implementation, which is impractical for most users.
>
> To simplify network memory configurations for TaskManager and improve Flink
> usability, this FLIP proposed some optimization solutions for the issues.
>
> Looking forward to your feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>
> Best regards,
> Yuxin
>