You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2020/05/14 12:34:36 UTC

[DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

The parameter "state.backend.fs.memory-threshold" decides when a state will
become a file and when it will be stored inline with the metadata (to avoid
excessive amounts of small files).

By default, this threshold is 1K - so every state above that size becomes a
file. For many cases, this threshold seems to be too low.
There is an interesting talk with background on this from Scott Kidder:
https://www.youtube.com/watch?v=gycq0cY3TZ0

I wanted to discuss increasing this to 100K by default.

Advantage:
  - This should help many users out of the box, which otherwise see
checkpointing problems on systems like S3, GCS, etc.

Disadvantage:
  - For very large jobs, this increases the required heap memory on the JM
side, because more state needs to be kept in-line when gathering the acks
for a pending checkpoint.
  - If tasks have a lot of states and each state is roughly at this
threshold, we increase the chance of exceeding the RPC frame size and
failing the job.

What do you think?

Best,
Stephan

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Yu Li <ca...@gmail.com>.
Have created FLINK-17865 [1] to follow this up. Thanks.

Best Regards,
Yu

[1] https://issues.apache.org/jira/browse/FLINK-17865


On Wed, 20 May 2020 at 23:03, Stephan Ewen <se...@apache.org> wrote:

> Sounds good to me.
>
> By the next release, we should have also phased out the old Kafka Source,
> which is one of the most common and most problematic users of Union State
>
> On Wed, May 20, 2020 at 11:12 AM Yu Li <ca...@gmail.com> wrote:
>
> > +1 on improving Union State implementation.
> >
> > I think the concerns raised around union state is valid, meanwhile jobs
> > with 200 parallelism on the source operator could be regarded as "large
> > job".
> >
> > To compromise, I suggest we split the improvements of the issue into 3
> > steps:
> >
> > 1. Increase `state.backend.fs.memory-threshold` from 1K to 20K (which
> will
> > at most increase the memory cost on JM side by 200*200*20K=800MB)
> > 2. Improve the union state implementation
> > 3. Further increase `state.backend.fs.memory-threshold` higher
> >
> > What do you think? Thanks.
> >
> > Best Regards,
> > Yu
> >
> >
> > On Sat, 16 May 2020 at 23:15, Yun Tang <my...@live.com> wrote:
> >
> > > If we cannot get rid of union state, I think we should introduce memory
> > > control on the serialized TDDs when deploying
> > > tasks instead of how union state is implemented when assign state in
> > > StateAssignmentOperation.
> > > The duplicated TaskStateSnapshot would not really increase much memory
> as
> > > the ByteStreamStateHandle's are
> > > actually share the same reference until they are serialized.
> > >
> > > When talking about the estimated memory footprint, I previously think
> > that
> > > depends on the pool size of future executor
> (HardWare#getNumberCPUCores).
> > > However, with the simple program below, I found the async submit task
> > logic
> > > make the number of existing RemoteRpcInvocation in JM at the same time
> > > larger than the HardWare#getNumberCPUCores.
> > > Take below program for example, we have 200 parallelism of source and
> the
> > > existing RemoteRpcInvocation in JM at the same time could be nearly 200
> > > while our pool size of future executor is only 96. I think if we could
> > > clear the serialized data in RemoteRpcInvocation as soon as possible,
> we
> > > might mitigate this problem greatly.
> > >
> > > Simple program which used union state to reproduce the memory footprint
> > > problem: one sub-task of the total union state is 100KB bytes array,
> and
> > > 200 sub-tasks in total could lead to more than 100KB * 200 * 200 =
> 3.8GB
> > > memory for all union state.
> > >
> > > public class Program {
> > >    private static final Logger LOG =
> > > LoggerFactory.getLogger(Program.class);
> > >
> > >    public static void main(String[] args) throws Exception {
> > >       final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >       env.enableCheckpointing(60 * 1000L);
> > >       env.addSource(new MySource()).setParallelism(200).print();
> > >       env.execute("Mock program");
> > >    }
> > >
> > >    private static class MySource extends
> > > RichParallelSourceFunction<Integer> implements CheckpointedFunction {
> > >       private static final ListStateDescriptor<byte[]> stateDescriptor
> =
> > > new ListStateDescriptor<>("list-1", byte[].class);
> > >       private ListState<byte[]> unionListState;
> > >       private volatile boolean running = true;
> > >       @Override
> > >       public void snapshotState(FunctionSnapshotContext context) throws
> > > Exception {
> > >          unionListState.clear();
> > >          byte[] array = new byte[100 * 1024];
> > >          ThreadLocalRandom.current().nextBytes(array);
> > >          unionListState.add(array);
> > >       }
> > >
> > >       @Override
> > >       public void initializeState(FunctionInitializationContext
> context)
> > > throws Exception {
> > >          if (context.isRestored()) {
> > >             unionListState =
> > > context.getOperatorStateStore().getUnionListState(stateDescriptor);
> > >             List<byte[]> collect =
> > > StreamSupport.stream(unionListState.get().spliterator(),
> > > false).collect(Collectors.toList());
> > >             LOG.info("union state Collect size: {}.", collect.size());
> > >          } else {
> > >             unionListState =
> > > context.getOperatorStateStore().getUnionListState(stateDescriptor);
> > >          }
> > >       }
> > >
> > >       @Override
> > >       public void run(SourceContext<Integer> ctx) throws Exception {
> > >          while (running) {
> > >             synchronized (ctx.getCheckpointLock()) {
> > >                ctx.collect(ThreadLocalRandom.current().nextInt());
> > >             }
> > >             Thread.sleep(100);
> > >          }
> > >       }
> > >
> > >       @Override
> > >       public void cancel() {
> > >          running = false;
> > >       }
> > >    }
> > > }
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Stephan Ewen <se...@apache.org>
> > > Sent: Saturday, May 16, 2020 18:56
> > > To: dev <de...@flink.apache.org>
> > > Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> > > piotr@ververica.com>
> > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> from
> > > 1K to 100K
> > >
> > > Okay, thank you for all the feedback.
> > >
> > > So we should definitely work on getting rid of the Union State, or at
> > least
> > > change the way it is implemented (avoid duplicate serializer
> snapshots).
> > >
> > > Can you estimate from which size of the cluster on the JM heap usage
> > > becomes critical (if we increased the threshold to 100k, or maybe 50k)
> ?
> > >
> > >
> > > On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qc...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Overall, I agree with increasing this value. but the default value
> set
> > to
> > > > 100K maybe something too large from my side.
> > > >
> > > > I want to share some more information from my side.
> > > >
> > > > The small files problem is indeed a problem many users may encounter
> in
> > > > production env. The states(Keyed state and Operator state) can become
> > > small
> > > > files in DFS, but increase the value of
> > > `state.backend.fs.memory-threshold`
> > > > may encounter the JM OOM problem as Yun said previously.
> > > > We've tried increase this value in our production env, but some
> > > connectors
> > > > which UnionState prevent us to do this, the memory consumed by these
> > jobs
> > > > can be very large (in our case, thousands of parallelism, set
> > > > `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory
> > for
> > > > JM), so in the end, we use the solution proposed in FLINK-11937[1]
> for
> > > both
> > > > keyed state and operator state.
> > > >
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-11937
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > Yun Tang <my...@live.com> 于2020年5月15日周五 下午9:09写道:
> > > >
> > > > > Please correct me if I am wrong, "put the increased value into the
> > > > default
> > > > > configuration" means
> > > > > we will update that in default flink-conf.yaml but still leave the
> > > > default
> > > > > value of `state.backend.fs.memory-threshold`as previously?
> > > > > It seems I did not get the point why existing setups with existing
> > > > configs
> > > > > will not be affected.
> > > > >
> > > > > The concern I raised is because one of our large-scale job with
> 1024
> > > > > parallelism source of union state meet the JM OOM problem when we
> > > > increase
> > > > > this value.
> > > > > I think if we introduce memory control when serializing TDD
> > > > asynchronously
> > > > > [1], we could be much more confident to increase this configuration
> > as
> > > > the
> > > > > memory footprint
> > > > > expands at that time by a lot of serialized TDDs.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > >
> > > > > ________________________________
> > > > > From: Stephan Ewen <se...@apache.org>
> > > > > Sent: Friday, May 15, 2020 16:53
> > > > > To: dev <de...@flink.apache.org>
> > > > > Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> > > > > piotr@ververica.com>
> > > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> > > from
> > > > > 1K to 100K
> > > > >
> > > > > I see, thanks for all the input.
> > > > >
> > > > > I agree with Yun Tang that the use of UnionState is problematic and
> > can
> > > > > cause issues in conjunction with this.
> > > > > However, most of the large-scale users I know that also struggle
> with
> > > > > UnionState have also increased this threshold, because with this
> low
> > > > > threshold, they get an excess number of small files and overwhelm
> > their
> > > > > HDFS / S3 / etc.
> > > > >
> > > > > An intermediate solution could be to put the increased value into
> the
> > > > > default configuration. That way, existing setups with existing
> > configs
> > > > will
> > > > > not be affected, but new users / installations will have a simper
> > time.
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:
> > > > >
> > > > > > Tend to be not in favor of this proposal as union state is
> somewhat
> > > > > abused
> > > > > > in several popular source connectors (e.g. kafka), and increasing
> > > this
> > > > > > value could lead to JM OOM when sending tdd from JM to TMs with
> > large
> > > > > > parallelism.
> > > > > >
> > > > > > After we collect union state and initialize the map list [1], we
> > > > already
> > > > > > have union state ready to assign. At this time, the memory
> > footprint
> > > > has
> > > > > > not increase too much as the union state which shared across
> tasks
> > > have
> > > > > the
> > > > > > same reference of ByteStreamStateHandle. However, when we send
> tdd
> > > with
> > > > > the
> > > > > > taskRestore to TMs, akka will serialize those
> ByteStreamStateHandle
> > > > > within
> > > > > > tdd to increases the memory footprint. If the source have 1024
> > > > > > parallelisms, and any one of the sub-task would then have
> > 1024*100KB
> > > > size
> > > > > > state handles. The sum of total memory footprint cannot be
> ignored.
> > > > > >
> > > > > > If we plan to increase the default value of
> > > > > > state.backend.fs.memory-threshold, we should first resolve the
> > above
> > > > > case.
> > > > > > In other words, this proposal could be a trade-off, which benefit
> > > > perhaps
> > > > > > 99% users, but might bring harmful effects to 1% user with
> > > large-scale
> > > > > > flink jobs.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> > > > > >
> > > > > > Best
> > > > > > Yun Tang
> > > > > >
> > > > > >
> > > > > > ________________________________
> > > > > > From: Yu Li <ca...@gmail.com>
> > > > > > Sent: Thursday, May 14, 2020 23:51
> > > > > > To: Till Rohrmann <tr...@apache.org>
> > > > > > Cc: dev <de...@flink.apache.org>; Piotr Nowojski <
> > piotr@ververica.com>
> > > > > > Subject: Re: [DISCUSS] increase
> "state.backend.fs.memory-threshold"
> > > > from
> > > > > > 1K to 100K
> > > > > >
> > > > > > TL;DR: I have some reservations but tend to be +1 for the
> proposal,
> > > > > > meanwhile suggest we have a more thorough solution in the long
> run.
> > > > > >
> > > > > > Please correct me if I'm wrong, but it seems the root cause of
> the
> > > > issue
> > > > > is
> > > > > > too many small files generated.
> > > > > >
> > > > > > I have some concerns for the case of session cluster [1], as well
> > as
> > > > > > possible issues for users at large scale, otherwise I think
> > > increasing
> > > > > > `state.backend.fs.memory-threshold` to 100K is a good choice,
> based
> > > on
> > > > > the
> > > > > > assumption that a large portion of our users are running small
> jobs
> > > > with
> > > > > > small states.
> > > > > >
> > > > > > OTOH, maybe extending the solution [2] of resolving RocksDB small
> > > file
> > > > > > problem (as proposed by FLINK-11937 [3]) to also support operator
> > > state
> > > > > > could be an alternative? We have already applied the solution in
> > > > > production
> > > > > > for operator state and solved the HDFS NN RPC bottleneck problem
> on
> > > > last
> > > > > > year's Singles' day.
> > > > > >
> > > > > > Best Regards,
> > > > > > Yu
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > > > > > >
> > > > > > [3] https://issues.apache.org/jira/browse/FLINK-11937
> > > > > >
> > > > > >
> > > > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <
> trohrmann@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > I cannot say much about the concrete value but if our users
> have
> > > > > problems
> > > > > > > with the existing default values, then it makes sense to me to
> > > change
> > > > > it.
> > > > > > >
> > > > > > > One thing to check could be whether it is possible to provide a
> > > > > > meaningful
> > > > > > > exception in case that the state size exceeds the frame size.
> At
> > > the
> > > > > > > moment, Flink should fail with a message saying that a rpc
> > message
> > > > > > exceeds
> > > > > > > the maximum frame size. Maybe it is also possible to point the
> > user
> > > > > > towards
> > > > > > > "state.backend.fs.memory-threshold" if the message exceeds the
> > > frame
> > > > > size
> > > > > > > because of too much state.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > >
> > > > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <sewen@apache.org
> >
> > > > wrote:
> > > > > > >
> > > > > > >> The parameter "state.backend.fs.memory-threshold" decides
> when a
> > > > state
> > > > > > >> will
> > > > > > >> become a file and when it will be stored inline with the
> > metadata
> > > > (to
> > > > > > >> avoid
> > > > > > >> excessive amounts of small files).
> > > > > > >>
> > > > > > >> By default, this threshold is 1K - so every state above that
> > size
> > > > > > becomes
> > > > > > >> a
> > > > > > >> file. For many cases, this threshold seems to be too low.
> > > > > > >> There is an interesting talk with background on this from
> Scott
> > > > > Kidder:
> > > > > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > > > > > >>
> > > > > > >> I wanted to discuss increasing this to 100K by default.
> > > > > > >>
> > > > > > >> Advantage:
> > > > > > >>   - This should help many users out of the box, which
> otherwise
> > > see
> > > > > > >> checkpointing problems on systems like S3, GCS, etc.
> > > > > > >>
> > > > > > >> Disadvantage:
> > > > > > >>   - For very large jobs, this increases the required heap
> memory
> > > on
> > > > > the
> > > > > > JM
> > > > > > >> side, because more state needs to be kept in-line when
> gathering
> > > the
> > > > > > acks
> > > > > > >> for a pending checkpoint.
> > > > > > >>   - If tasks have a lot of states and each state is roughly at
> > > this
> > > > > > >> threshold, we increase the chance of exceeding the RPC frame
> > size
> > > > and
> > > > > > >> failing the job.
> > > > > > >>
> > > > > > >> What do you think?
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Stephan
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Stephan Ewen <se...@apache.org>.
Sounds good to me.

By the next release, we should have also phased out the old Kafka Source,
which is one of the most common and most problematic users of Union State

On Wed, May 20, 2020 at 11:12 AM Yu Li <ca...@gmail.com> wrote:

> +1 on improving Union State implementation.
>
> I think the concerns raised around union state is valid, meanwhile jobs
> with 200 parallelism on the source operator could be regarded as "large
> job".
>
> To compromise, I suggest we split the improvements of the issue into 3
> steps:
>
> 1. Increase `state.backend.fs.memory-threshold` from 1K to 20K (which will
> at most increase the memory cost on JM side by 200*200*20K=800MB)
> 2. Improve the union state implementation
> 3. Further increase `state.backend.fs.memory-threshold` higher
>
> What do you think? Thanks.
>
> Best Regards,
> Yu
>
>
> On Sat, 16 May 2020 at 23:15, Yun Tang <my...@live.com> wrote:
>
> > If we cannot get rid of union state, I think we should introduce memory
> > control on the serialized TDDs when deploying
> > tasks instead of how union state is implemented when assign state in
> > StateAssignmentOperation.
> > The duplicated TaskStateSnapshot would not really increase much memory as
> > the ByteStreamStateHandle's are
> > actually share the same reference until they are serialized.
> >
> > When talking about the estimated memory footprint, I previously think
> that
> > depends on the pool size of future executor (HardWare#getNumberCPUCores).
> > However, with the simple program below, I found the async submit task
> logic
> > make the number of existing RemoteRpcInvocation in JM at the same time
> > larger than the HardWare#getNumberCPUCores.
> > Take below program for example, we have 200 parallelism of source and the
> > existing RemoteRpcInvocation in JM at the same time could be nearly 200
> > while our pool size of future executor is only 96. I think if we could
> > clear the serialized data in RemoteRpcInvocation as soon as possible, we
> > might mitigate this problem greatly.
> >
> > Simple program which used union state to reproduce the memory footprint
> > problem: one sub-task of the total union state is 100KB bytes array, and
> > 200 sub-tasks in total could lead to more than 100KB * 200 * 200 = 3.8GB
> > memory for all union state.
> >
> > public class Program {
> >    private static final Logger LOG =
> > LoggerFactory.getLogger(Program.class);
> >
> >    public static void main(String[] args) throws Exception {
> >       final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >       env.enableCheckpointing(60 * 1000L);
> >       env.addSource(new MySource()).setParallelism(200).print();
> >       env.execute("Mock program");
> >    }
> >
> >    private static class MySource extends
> > RichParallelSourceFunction<Integer> implements CheckpointedFunction {
> >       private static final ListStateDescriptor<byte[]> stateDescriptor =
> > new ListStateDescriptor<>("list-1", byte[].class);
> >       private ListState<byte[]> unionListState;
> >       private volatile boolean running = true;
> >       @Override
> >       public void snapshotState(FunctionSnapshotContext context) throws
> > Exception {
> >          unionListState.clear();
> >          byte[] array = new byte[100 * 1024];
> >          ThreadLocalRandom.current().nextBytes(array);
> >          unionListState.add(array);
> >       }
> >
> >       @Override
> >       public void initializeState(FunctionInitializationContext context)
> > throws Exception {
> >          if (context.isRestored()) {
> >             unionListState =
> > context.getOperatorStateStore().getUnionListState(stateDescriptor);
> >             List<byte[]> collect =
> > StreamSupport.stream(unionListState.get().spliterator(),
> > false).collect(Collectors.toList());
> >             LOG.info("union state Collect size: {}.", collect.size());
> >          } else {
> >             unionListState =
> > context.getOperatorStateStore().getUnionListState(stateDescriptor);
> >          }
> >       }
> >
> >       @Override
> >       public void run(SourceContext<Integer> ctx) throws Exception {
> >          while (running) {
> >             synchronized (ctx.getCheckpointLock()) {
> >                ctx.collect(ThreadLocalRandom.current().nextInt());
> >             }
> >             Thread.sleep(100);
> >          }
> >       }
> >
> >       @Override
> >       public void cancel() {
> >          running = false;
> >       }
> >    }
> > }
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Stephan Ewen <se...@apache.org>
> > Sent: Saturday, May 16, 2020 18:56
> > To: dev <de...@flink.apache.org>
> > Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> > piotr@ververica.com>
> > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> > 1K to 100K
> >
> > Okay, thank you for all the feedback.
> >
> > So we should definitely work on getting rid of the Union State, or at
> least
> > change the way it is implemented (avoid duplicate serializer snapshots).
> >
> > Can you estimate from which size of the cluster on the JM heap usage
> > becomes critical (if we increased the threshold to 100k, or maybe 50k) ?
> >
> >
> > On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qc...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Overall, I agree with increasing this value. but the default value set
> to
> > > 100K maybe something too large from my side.
> > >
> > > I want to share some more information from my side.
> > >
> > > The small files problem is indeed a problem many users may encounter in
> > > production env. The states(Keyed state and Operator state) can become
> > small
> > > files in DFS, but increase the value of
> > `state.backend.fs.memory-threshold`
> > > may encounter the JM OOM problem as Yun said previously.
> > > We've tried increase this value in our production env, but some
> > connectors
> > > which UnionState prevent us to do this, the memory consumed by these
> jobs
> > > can be very large (in our case, thousands of parallelism, set
> > > `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory
> for
> > > JM), so in the end, we use the solution proposed in FLINK-11937[1] for
> > both
> > > keyed state and operator state.
> > >
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11937
> > > Best,
> > > Congxian
> > >
> > >
> > > Yun Tang <my...@live.com> 于2020年5月15日周五 下午9:09写道:
> > >
> > > > Please correct me if I am wrong, "put the increased value into the
> > > default
> > > > configuration" means
> > > > we will update that in default flink-conf.yaml but still leave the
> > > default
> > > > value of `state.backend.fs.memory-threshold`as previously?
> > > > It seems I did not get the point why existing setups with existing
> > > configs
> > > > will not be affected.
> > > >
> > > > The concern I raised is because one of our large-scale job with 1024
> > > > parallelism source of union state meet the JM OOM problem when we
> > > increase
> > > > this value.
> > > > I think if we introduce memory control when serializing TDD
> > > asynchronously
> > > > [1], we could be much more confident to increase this configuration
> as
> > > the
> > > > memory footprint
> > > > expands at that time by a lot of serialized TDDs.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
> > > >
> > > > Best
> > > > Yun Tang
> > > >
> > > > ________________________________
> > > > From: Stephan Ewen <se...@apache.org>
> > > > Sent: Friday, May 15, 2020 16:53
> > > > To: dev <de...@flink.apache.org>
> > > > Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> > > > piotr@ververica.com>
> > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> > from
> > > > 1K to 100K
> > > >
> > > > I see, thanks for all the input.
> > > >
> > > > I agree with Yun Tang that the use of UnionState is problematic and
> can
> > > > cause issues in conjunction with this.
> > > > However, most of the large-scale users I know that also struggle with
> > > > UnionState have also increased this threshold, because with this low
> > > > threshold, they get an excess number of small files and overwhelm
> their
> > > > HDFS / S3 / etc.
> > > >
> > > > An intermediate solution could be to put the increased value into the
> > > > default configuration. That way, existing setups with existing
> configs
> > > will
> > > > not be affected, but new users / installations will have a simper
> time.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:
> > > >
> > > > > Tend to be not in favor of this proposal as union state is somewhat
> > > > abused
> > > > > in several popular source connectors (e.g. kafka), and increasing
> > this
> > > > > value could lead to JM OOM when sending tdd from JM to TMs with
> large
> > > > > parallelism.
> > > > >
> > > > > After we collect union state and initialize the map list [1], we
> > > already
> > > > > have union state ready to assign. At this time, the memory
> footprint
> > > has
> > > > > not increase too much as the union state which shared across tasks
> > have
> > > > the
> > > > > same reference of ByteStreamStateHandle. However, when we send tdd
> > with
> > > > the
> > > > > taskRestore to TMs, akka will serialize those ByteStreamStateHandle
> > > > within
> > > > > tdd to increases the memory footprint. If the source have 1024
> > > > > parallelisms, and any one of the sub-task would then have
> 1024*100KB
> > > size
> > > > > state handles. The sum of total memory footprint cannot be ignored.
> > > > >
> > > > > If we plan to increase the default value of
> > > > > state.backend.fs.memory-threshold, we should first resolve the
> above
> > > > case.
> > > > > In other words, this proposal could be a trade-off, which benefit
> > > perhaps
> > > > > 99% users, but might bring harmful effects to 1% user with
> > large-scale
> > > > > flink jobs.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > >
> > > > >
> > > > > ________________________________
> > > > > From: Yu Li <ca...@gmail.com>
> > > > > Sent: Thursday, May 14, 2020 23:51
> > > > > To: Till Rohrmann <tr...@apache.org>
> > > > > Cc: dev <de...@flink.apache.org>; Piotr Nowojski <
> piotr@ververica.com>
> > > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> > > from
> > > > > 1K to 100K
> > > > >
> > > > > TL;DR: I have some reservations but tend to be +1 for the proposal,
> > > > > meanwhile suggest we have a more thorough solution in the long run.
> > > > >
> > > > > Please correct me if I'm wrong, but it seems the root cause of the
> > > issue
> > > > is
> > > > > too many small files generated.
> > > > >
> > > > > I have some concerns for the case of session cluster [1], as well
> as
> > > > > possible issues for users at large scale, otherwise I think
> > increasing
> > > > > `state.backend.fs.memory-threshold` to 100K is a good choice, based
> > on
> > > > the
> > > > > assumption that a large portion of our users are running small jobs
> > > with
> > > > > small states.
> > > > >
> > > > > OTOH, maybe extending the solution [2] of resolving RocksDB small
> > file
> > > > > problem (as proposed by FLINK-11937 [3]) to also support operator
> > state
> > > > > could be an alternative? We have already applied the solution in
> > > > production
> > > > > for operator state and solved the HDFS NN RPC bottleneck problem on
> > > last
> > > > > year's Singles' day.
> > > > >
> > > > > Best Regards,
> > > > > Yu
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > > > > <
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > > > > >
> > > > > [3] https://issues.apache.org/jira/browse/FLINK-11937
> > > > >
> > > > >
> > > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org>
> > > > wrote:
> > > > >
> > > > > > I cannot say much about the concrete value but if our users have
> > > > problems
> > > > > > with the existing default values, then it makes sense to me to
> > change
> > > > it.
> > > > > >
> > > > > > One thing to check could be whether it is possible to provide a
> > > > > meaningful
> > > > > > exception in case that the state size exceeds the frame size. At
> > the
> > > > > > moment, Flink should fail with a message saying that a rpc
> message
> > > > > exceeds
> > > > > > the maximum frame size. Maybe it is also possible to point the
> user
> > > > > towards
> > > > > > "state.backend.fs.memory-threshold" if the message exceeds the
> > frame
> > > > size
> > > > > > because of too much state.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > > >
> > > > > >> The parameter "state.backend.fs.memory-threshold" decides when a
> > > state
> > > > > >> will
> > > > > >> become a file and when it will be stored inline with the
> metadata
> > > (to
> > > > > >> avoid
> > > > > >> excessive amounts of small files).
> > > > > >>
> > > > > >> By default, this threshold is 1K - so every state above that
> size
> > > > > becomes
> > > > > >> a
> > > > > >> file. For many cases, this threshold seems to be too low.
> > > > > >> There is an interesting talk with background on this from Scott
> > > > Kidder:
> > > > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > > > > >>
> > > > > >> I wanted to discuss increasing this to 100K by default.
> > > > > >>
> > > > > >> Advantage:
> > > > > >>   - This should help many users out of the box, which otherwise
> > see
> > > > > >> checkpointing problems on systems like S3, GCS, etc.
> > > > > >>
> > > > > >> Disadvantage:
> > > > > >>   - For very large jobs, this increases the required heap memory
> > on
> > > > the
> > > > > JM
> > > > > >> side, because more state needs to be kept in-line when gathering
> > the
> > > > > acks
> > > > > >> for a pending checkpoint.
> > > > > >>   - If tasks have a lot of states and each state is roughly at
> > this
> > > > > >> threshold, we increase the chance of exceeding the RPC frame
> size
> > > and
> > > > > >> failing the job.
> > > > > >>
> > > > > >> What do you think?
> > > > > >>
> > > > > >> Best,
> > > > > >> Stephan
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Yu Li <ca...@gmail.com>.
+1 on improving Union State implementation.

I think the concerns raised around union state is valid, meanwhile jobs
with 200 parallelism on the source operator could be regarded as "large
job".

To compromise, I suggest we split the improvements of the issue into 3
steps:

1. Increase `state.backend.fs.memory-threshold` from 1K to 20K (which will
at most increase the memory cost on JM side by 200*200*20K=800MB)
2. Improve the union state implementation
3. Further increase `state.backend.fs.memory-threshold` higher

What do you think? Thanks.

Best Regards,
Yu


On Sat, 16 May 2020 at 23:15, Yun Tang <my...@live.com> wrote:

> If we cannot get rid of union state, I think we should introduce memory
> control on the serialized TDDs when deploying
> tasks instead of how union state is implemented when assign state in
> StateAssignmentOperation.
> The duplicated TaskStateSnapshot would not really increase much memory as
> the ByteStreamStateHandle's are
> actually share the same reference until they are serialized.
>
> When talking about the estimated memory footprint, I previously think that
> depends on the pool size of future executor (HardWare#getNumberCPUCores).
> However, with the simple program below, I found the async submit task logic
> make the number of existing RemoteRpcInvocation in JM at the same time
> larger than the HardWare#getNumberCPUCores.
> Take below program for example, we have 200 parallelism of source and the
> existing RemoteRpcInvocation in JM at the same time could be nearly 200
> while our pool size of future executor is only 96. I think if we could
> clear the serialized data in RemoteRpcInvocation as soon as possible, we
> might mitigate this problem greatly.
>
> Simple program which used union state to reproduce the memory footprint
> problem: one sub-task of the total union state is 100KB bytes array, and
> 200 sub-tasks in total could lead to more than 100KB * 200 * 200 = 3.8GB
> memory for all union state.
>
> public class Program {
>    private static final Logger LOG =
> LoggerFactory.getLogger(Program.class);
>
>    public static void main(String[] args) throws Exception {
>       final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>       env.enableCheckpointing(60 * 1000L);
>       env.addSource(new MySource()).setParallelism(200).print();
>       env.execute("Mock program");
>    }
>
>    private static class MySource extends
> RichParallelSourceFunction<Integer> implements CheckpointedFunction {
>       private static final ListStateDescriptor<byte[]> stateDescriptor =
> new ListStateDescriptor<>("list-1", byte[].class);
>       private ListState<byte[]> unionListState;
>       private volatile boolean running = true;
>       @Override
>       public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
>          unionListState.clear();
>          byte[] array = new byte[100 * 1024];
>          ThreadLocalRandom.current().nextBytes(array);
>          unionListState.add(array);
>       }
>
>       @Override
>       public void initializeState(FunctionInitializationContext context)
> throws Exception {
>          if (context.isRestored()) {
>             unionListState =
> context.getOperatorStateStore().getUnionListState(stateDescriptor);
>             List<byte[]> collect =
> StreamSupport.stream(unionListState.get().spliterator(),
> false).collect(Collectors.toList());
>             LOG.info("union state Collect size: {}.", collect.size());
>          } else {
>             unionListState =
> context.getOperatorStateStore().getUnionListState(stateDescriptor);
>          }
>       }
>
>       @Override
>       public void run(SourceContext<Integer> ctx) throws Exception {
>          while (running) {
>             synchronized (ctx.getCheckpointLock()) {
>                ctx.collect(ThreadLocalRandom.current().nextInt());
>             }
>             Thread.sleep(100);
>          }
>       }
>
>       @Override
>       public void cancel() {
>          running = false;
>       }
>    }
> }
>
> Best
> Yun Tang
> ________________________________
> From: Stephan Ewen <se...@apache.org>
> Sent: Saturday, May 16, 2020 18:56
> To: dev <de...@flink.apache.org>
> Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> piotr@ververica.com>
> Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> 1K to 100K
>
> Okay, thank you for all the feedback.
>
> So we should definitely work on getting rid of the Union State, or at least
> change the way it is implemented (avoid duplicate serializer snapshots).
>
> Can you estimate from which size of the cluster on the JM heap usage
> becomes critical (if we increased the threshold to 100k, or maybe 50k) ?
>
>
> On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qc...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Overall, I agree with increasing this value. but the default value set to
> > 100K maybe something too large from my side.
> >
> > I want to share some more information from my side.
> >
> > The small files problem is indeed a problem many users may encounter in
> > production env. The states(Keyed state and Operator state) can become
> small
> > files in DFS, but increase the value of
> `state.backend.fs.memory-threshold`
> > may encounter the JM OOM problem as Yun said previously.
> > We've tried increase this value in our production env, but some
> connectors
> > which UnionState prevent us to do this, the memory consumed by these jobs
> > can be very large (in our case, thousands of parallelism, set
> > `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory for
> > JM), so in the end, we use the solution proposed in FLINK-11937[1] for
> both
> > keyed state and operator state.
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-11937
> > Best,
> > Congxian
> >
> >
> > Yun Tang <my...@live.com> 于2020年5月15日周五 下午9:09写道:
> >
> > > Please correct me if I am wrong, "put the increased value into the
> > default
> > > configuration" means
> > > we will update that in default flink-conf.yaml but still leave the
> > default
> > > value of `state.backend.fs.memory-threshold`as previously?
> > > It seems I did not get the point why existing setups with existing
> > configs
> > > will not be affected.
> > >
> > > The concern I raised is because one of our large-scale job with 1024
> > > parallelism source of union state meet the JM OOM problem when we
> > increase
> > > this value.
> > > I think if we introduce memory control when serializing TDD
> > asynchronously
> > > [1], we could be much more confident to increase this configuration as
> > the
> > > memory footprint
> > > expands at that time by a lot of serialized TDDs.
> > >
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
> > >
> > > Best
> > > Yun Tang
> > >
> > > ________________________________
> > > From: Stephan Ewen <se...@apache.org>
> > > Sent: Friday, May 15, 2020 16:53
> > > To: dev <de...@flink.apache.org>
> > > Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> > > piotr@ververica.com>
> > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> from
> > > 1K to 100K
> > >
> > > I see, thanks for all the input.
> > >
> > > I agree with Yun Tang that the use of UnionState is problematic and can
> > > cause issues in conjunction with this.
> > > However, most of the large-scale users I know that also struggle with
> > > UnionState have also increased this threshold, because with this low
> > > threshold, they get an excess number of small files and overwhelm their
> > > HDFS / S3 / etc.
> > >
> > > An intermediate solution could be to put the increased value into the
> > > default configuration. That way, existing setups with existing configs
> > will
> > > not be affected, but new users / installations will have a simper time.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:
> > >
> > > > Tend to be not in favor of this proposal as union state is somewhat
> > > abused
> > > > in several popular source connectors (e.g. kafka), and increasing
> this
> > > > value could lead to JM OOM when sending tdd from JM to TMs with large
> > > > parallelism.
> > > >
> > > > After we collect union state and initialize the map list [1], we
> > already
> > > > have union state ready to assign. At this time, the memory footprint
> > has
> > > > not increase too much as the union state which shared across tasks
> have
> > > the
> > > > same reference of ByteStreamStateHandle. However, when we send tdd
> with
> > > the
> > > > taskRestore to TMs, akka will serialize those ByteStreamStateHandle
> > > within
> > > > tdd to increases the memory footprint. If the source have 1024
> > > > parallelisms, and any one of the sub-task would then have 1024*100KB
> > size
> > > > state handles. The sum of total memory footprint cannot be ignored.
> > > >
> > > > If we plan to increase the default value of
> > > > state.backend.fs.memory-threshold, we should first resolve the above
> > > case.
> > > > In other words, this proposal could be a trade-off, which benefit
> > perhaps
> > > > 99% users, but might bring harmful effects to 1% user with
> large-scale
> > > > flink jobs.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> > > >
> > > > Best
> > > > Yun Tang
> > > >
> > > >
> > > > ________________________________
> > > > From: Yu Li <ca...@gmail.com>
> > > > Sent: Thursday, May 14, 2020 23:51
> > > > To: Till Rohrmann <tr...@apache.org>
> > > > Cc: dev <de...@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> > from
> > > > 1K to 100K
> > > >
> > > > TL;DR: I have some reservations but tend to be +1 for the proposal,
> > > > meanwhile suggest we have a more thorough solution in the long run.
> > > >
> > > > Please correct me if I'm wrong, but it seems the root cause of the
> > issue
> > > is
> > > > too many small files generated.
> > > >
> > > > I have some concerns for the case of session cluster [1], as well as
> > > > possible issues for users at large scale, otherwise I think
> increasing
> > > > `state.backend.fs.memory-threshold` to 100K is a good choice, based
> on
> > > the
> > > > assumption that a large portion of our users are running small jobs
> > with
> > > > small states.
> > > >
> > > > OTOH, maybe extending the solution [2] of resolving RocksDB small
> file
> > > > problem (as proposed by FLINK-11937 [3]) to also support operator
> state
> > > > could be an alternative? We have already applied the solution in
> > > production
> > > > for operator state and solved the HDFS NN RPC bottleneck problem on
> > last
> > > > year's Singles' day.
> > > >
> > > > Best Regards,
> > > > Yu
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > > > [2]
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > > > <
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > > > >
> > > > [3] https://issues.apache.org/jira/browse/FLINK-11937
> > > >
> > > >
> > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org>
> > > wrote:
> > > >
> > > > > I cannot say much about the concrete value but if our users have
> > > problems
> > > > > with the existing default values, then it makes sense to me to
> change
> > > it.
> > > > >
> > > > > One thing to check could be whether it is possible to provide a
> > > > meaningful
> > > > > exception in case that the state size exceeds the frame size. At
> the
> > > > > moment, Flink should fail with a message saying that a rpc message
> > > > exceeds
> > > > > the maximum frame size. Maybe it is also possible to point the user
> > > > towards
> > > > > "state.backend.fs.memory-threshold" if the message exceeds the
> frame
> > > size
> > > > > because of too much state.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org>
> > wrote:
> > > > >
> > > > >> The parameter "state.backend.fs.memory-threshold" decides when a
> > state
> > > > >> will
> > > > >> become a file and when it will be stored inline with the metadata
> > (to
> > > > >> avoid
> > > > >> excessive amounts of small files).
> > > > >>
> > > > >> By default, this threshold is 1K - so every state above that size
> > > > becomes
> > > > >> a
> > > > >> file. For many cases, this threshold seems to be too low.
> > > > >> There is an interesting talk with background on this from Scott
> > > Kidder:
> > > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > > > >>
> > > > >> I wanted to discuss increasing this to 100K by default.
> > > > >>
> > > > >> Advantage:
> > > > >>   - This should help many users out of the box, which otherwise
> see
> > > > >> checkpointing problems on systems like S3, GCS, etc.
> > > > >>
> > > > >> Disadvantage:
> > > > >>   - For very large jobs, this increases the required heap memory
> on
> > > the
> > > > JM
> > > > >> side, because more state needs to be kept in-line when gathering
> the
> > > > acks
> > > > >> for a pending checkpoint.
> > > > >>   - If tasks have a lot of states and each state is roughly at
> this
> > > > >> threshold, we increase the chance of exceeding the RPC frame size
> > and
> > > > >> failing the job.
> > > > >>
> > > > >> What do you think?
> > > > >>
> > > > >> Best,
> > > > >> Stephan
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Yun Tang <my...@live.com>.
If we cannot get rid of union state, I think we should introduce memory control on the serialized TDDs when deploying
tasks instead of how union state is implemented when assign state in StateAssignmentOperation.
The duplicated TaskStateSnapshot would not really increase much memory as the ByteStreamStateHandle's are
actually share the same reference until they are serialized.

When talking about the estimated memory footprint, I previously think that depends on the pool size of future executor (HardWare#getNumberCPUCores). However, with the simple program below, I found the async submit task logic make the number of existing RemoteRpcInvocation in JM at the same time larger than the HardWare#getNumberCPUCores.
Take below program for example, we have 200 parallelism of source and the existing RemoteRpcInvocation in JM at the same time could be nearly 200 while our pool size of future executor is only 96. I think if we could clear the serialized data in RemoteRpcInvocation as soon as possible, we might mitigate this problem greatly.

Simple program which used union state to reproduce the memory footprint problem: one sub-task of the total union state is 100KB bytes array, and 200 sub-tasks in total could lead to more than 100KB * 200 * 200 = 3.8GB memory for all union state.

public class Program {
   private static final Logger LOG = LoggerFactory.getLogger(Program.class);

   public static void main(String[] args) throws Exception {
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.enableCheckpointing(60 * 1000L);
      env.addSource(new MySource()).setParallelism(200).print();
      env.execute("Mock program");
   }

   private static class MySource extends RichParallelSourceFunction<Integer> implements CheckpointedFunction {
      private static final ListStateDescriptor<byte[]> stateDescriptor = new ListStateDescriptor<>("list-1", byte[].class);
      private ListState<byte[]> unionListState;
      private volatile boolean running = true;
      @Override
      public void snapshotState(FunctionSnapshotContext context) throws Exception {
         unionListState.clear();
         byte[] array = new byte[100 * 1024];
         ThreadLocalRandom.current().nextBytes(array);
         unionListState.add(array);
      }

      @Override
      public void initializeState(FunctionInitializationContext context) throws Exception {
         if (context.isRestored()) {
            unionListState = context.getOperatorStateStore().getUnionListState(stateDescriptor);
            List<byte[]> collect = StreamSupport.stream(unionListState.get().spliterator(), false).collect(Collectors.toList());
            LOG.info("union state Collect size: {}.", collect.size());
         } else {
            unionListState = context.getOperatorStateStore().getUnionListState(stateDescriptor);
         }
      }

      @Override
      public void run(SourceContext<Integer> ctx) throws Exception {
         while (running) {
            synchronized (ctx.getCheckpointLock()) {
               ctx.collect(ThreadLocalRandom.current().nextInt());
            }
            Thread.sleep(100);
         }
      }

      @Override
      public void cancel() {
         running = false;
      }
   }
}

Best
Yun Tang
________________________________
From: Stephan Ewen <se...@apache.org>
Sent: Saturday, May 16, 2020 18:56
To: dev <de...@flink.apache.org>
Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <pi...@ververica.com>
Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Okay, thank you for all the feedback.

So we should definitely work on getting rid of the Union State, or at least
change the way it is implemented (avoid duplicate serializer snapshots).

Can you estimate from which size of the cluster on the JM heap usage
becomes critical (if we increased the threshold to 100k, or maybe 50k) ?


On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qc...@gmail.com> wrote:

> Hi,
>
> Overall, I agree with increasing this value. but the default value set to
> 100K maybe something too large from my side.
>
> I want to share some more information from my side.
>
> The small files problem is indeed a problem many users may encounter in
> production env. The states(Keyed state and Operator state) can become small
> files in DFS, but increase the value of `state.backend.fs.memory-threshold`
> may encounter the JM OOM problem as Yun said previously.
> We've tried increase this value in our production env, but some connectors
> which UnionState prevent us to do this, the memory consumed by these jobs
> can be very large (in our case, thousands of parallelism, set
> `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory for
> JM), so in the end, we use the solution proposed in FLINK-11937[1] for both
> keyed state and operator state.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11937
> Best,
> Congxian
>
>
> Yun Tang <my...@live.com> 于2020年5月15日周五 下午9:09写道:
>
> > Please correct me if I am wrong, "put the increased value into the
> default
> > configuration" means
> > we will update that in default flink-conf.yaml but still leave the
> default
> > value of `state.backend.fs.memory-threshold`as previously?
> > It seems I did not get the point why existing setups with existing
> configs
> > will not be affected.
> >
> > The concern I raised is because one of our large-scale job with 1024
> > parallelism source of union state meet the JM OOM problem when we
> increase
> > this value.
> > I think if we introduce memory control when serializing TDD
> asynchronously
> > [1], we could be much more confident to increase this configuration as
> the
> > memory footprint
> > expands at that time by a lot of serialized TDDs.
> >
> >
> > [1]
> >
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Stephan Ewen <se...@apache.org>
> > Sent: Friday, May 15, 2020 16:53
> > To: dev <de...@flink.apache.org>
> > Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> > piotr@ververica.com>
> > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> > 1K to 100K
> >
> > I see, thanks for all the input.
> >
> > I agree with Yun Tang that the use of UnionState is problematic and can
> > cause issues in conjunction with this.
> > However, most of the large-scale users I know that also struggle with
> > UnionState have also increased this threshold, because with this low
> > threshold, they get an excess number of small files and overwhelm their
> > HDFS / S3 / etc.
> >
> > An intermediate solution could be to put the increased value into the
> > default configuration. That way, existing setups with existing configs
> will
> > not be affected, but new users / installations will have a simper time.
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:
> >
> > > Tend to be not in favor of this proposal as union state is somewhat
> > abused
> > > in several popular source connectors (e.g. kafka), and increasing this
> > > value could lead to JM OOM when sending tdd from JM to TMs with large
> > > parallelism.
> > >
> > > After we collect union state and initialize the map list [1], we
> already
> > > have union state ready to assign. At this time, the memory footprint
> has
> > > not increase too much as the union state which shared across tasks have
> > the
> > > same reference of ByteStreamStateHandle. However, when we send tdd with
> > the
> > > taskRestore to TMs, akka will serialize those ByteStreamStateHandle
> > within
> > > tdd to increases the memory footprint. If the source have 1024
> > > parallelisms, and any one of the sub-task would then have 1024*100KB
> size
> > > state handles. The sum of total memory footprint cannot be ignored.
> > >
> > > If we plan to increase the default value of
> > > state.backend.fs.memory-threshold, we should first resolve the above
> > case.
> > > In other words, this proposal could be a trade-off, which benefit
> perhaps
> > > 99% users, but might bring harmful effects to 1% user with large-scale
> > > flink jobs.
> > >
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> > >
> > > Best
> > > Yun Tang
> > >
> > >
> > > ________________________________
> > > From: Yu Li <ca...@gmail.com>
> > > Sent: Thursday, May 14, 2020 23:51
> > > To: Till Rohrmann <tr...@apache.org>
> > > Cc: dev <de...@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> from
> > > 1K to 100K
> > >
> > > TL;DR: I have some reservations but tend to be +1 for the proposal,
> > > meanwhile suggest we have a more thorough solution in the long run.
> > >
> > > Please correct me if I'm wrong, but it seems the root cause of the
> issue
> > is
> > > too many small files generated.
> > >
> > > I have some concerns for the case of session cluster [1], as well as
> > > possible issues for users at large scale, otherwise I think increasing
> > > `state.backend.fs.memory-threshold` to 100K is a good choice, based on
> > the
> > > assumption that a large portion of our users are running small jobs
> with
> > > small states.
> > >
> > > OTOH, maybe extending the solution [2] of resolving RocksDB small file
> > > problem (as proposed by FLINK-11937 [3]) to also support operator state
> > > could be an alternative? We have already applied the solution in
> > production
> > > for operator state and solved the HDFS NN RPC bottleneck problem on
> last
> > > year's Singles' day.
> > >
> > > Best Regards,
> > > Yu
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > > [2]
> > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > > <
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > > >
> > > [3] https://issues.apache.org/jira/browse/FLINK-11937
> > >
> > >
> > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org>
> > wrote:
> > >
> > > > I cannot say much about the concrete value but if our users have
> > problems
> > > > with the existing default values, then it makes sense to me to change
> > it.
> > > >
> > > > One thing to check could be whether it is possible to provide a
> > > meaningful
> > > > exception in case that the state size exceeds the frame size. At the
> > > > moment, Flink should fail with a message saying that a rpc message
> > > exceeds
> > > > the maximum frame size. Maybe it is also possible to point the user
> > > towards
> > > > "state.backend.fs.memory-threshold" if the message exceeds the frame
> > size
> > > > because of too much state.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org>
> wrote:
> > > >
> > > >> The parameter "state.backend.fs.memory-threshold" decides when a
> state
> > > >> will
> > > >> become a file and when it will be stored inline with the metadata
> (to
> > > >> avoid
> > > >> excessive amounts of small files).
> > > >>
> > > >> By default, this threshold is 1K - so every state above that size
> > > becomes
> > > >> a
> > > >> file. For many cases, this threshold seems to be too low.
> > > >> There is an interesting talk with background on this from Scott
> > Kidder:
> > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > > >>
> > > >> I wanted to discuss increasing this to 100K by default.
> > > >>
> > > >> Advantage:
> > > >>   - This should help many users out of the box, which otherwise see
> > > >> checkpointing problems on systems like S3, GCS, etc.
> > > >>
> > > >> Disadvantage:
> > > >>   - For very large jobs, this increases the required heap memory on
> > the
> > > JM
> > > >> side, because more state needs to be kept in-line when gathering the
> > > acks
> > > >> for a pending checkpoint.
> > > >>   - If tasks have a lot of states and each state is roughly at this
> > > >> threshold, we increase the chance of exceeding the RPC frame size
> and
> > > >> failing the job.
> > > >>
> > > >> What do you think?
> > > >>
> > > >> Best,
> > > >> Stephan
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Stephan Ewen <se...@apache.org>.
Okay, thank you for all the feedback.

So we should definitely work on getting rid of the Union State, or at least
change the way it is implemented (avoid duplicate serializer snapshots).

Can you estimate from which size of the cluster on the JM heap usage
becomes critical (if we increased the threshold to 100k, or maybe 50k) ?


On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qc...@gmail.com> wrote:

> Hi,
>
> Overall, I agree with increasing this value. but the default value set to
> 100K maybe something too large from my side.
>
> I want to share some more information from my side.
>
> The small files problem is indeed a problem many users may encounter in
> production env. The states(Keyed state and Operator state) can become small
> files in DFS, but increase the value of `state.backend.fs.memory-threshold`
> may encounter the JM OOM problem as Yun said previously.
> We've tried increase this value in our production env, but some connectors
> which UnionState prevent us to do this, the memory consumed by these jobs
> can be very large (in our case, thousands of parallelism, set
> `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory for
> JM), so in the end, we use the solution proposed in FLINK-11937[1] for both
> keyed state and operator state.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11937
> Best,
> Congxian
>
>
> Yun Tang <my...@live.com> 于2020年5月15日周五 下午9:09写道:
>
> > Please correct me if I am wrong, "put the increased value into the
> default
> > configuration" means
> > we will update that in default flink-conf.yaml but still leave the
> default
> > value of `state.backend.fs.memory-threshold`as previously?
> > It seems I did not get the point why existing setups with existing
> configs
> > will not be affected.
> >
> > The concern I raised is because one of our large-scale job with 1024
> > parallelism source of union state meet the JM OOM problem when we
> increase
> > this value.
> > I think if we introduce memory control when serializing TDD
> asynchronously
> > [1], we could be much more confident to increase this configuration as
> the
> > memory footprint
> > expands at that time by a lot of serialized TDDs.
> >
> >
> > [1]
> >
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Stephan Ewen <se...@apache.org>
> > Sent: Friday, May 15, 2020 16:53
> > To: dev <de...@flink.apache.org>
> > Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> > piotr@ververica.com>
> > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> > 1K to 100K
> >
> > I see, thanks for all the input.
> >
> > I agree with Yun Tang that the use of UnionState is problematic and can
> > cause issues in conjunction with this.
> > However, most of the large-scale users I know that also struggle with
> > UnionState have also increased this threshold, because with this low
> > threshold, they get an excess number of small files and overwhelm their
> > HDFS / S3 / etc.
> >
> > An intermediate solution could be to put the increased value into the
> > default configuration. That way, existing setups with existing configs
> will
> > not be affected, but new users / installations will have a simper time.
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:
> >
> > > Tend to be not in favor of this proposal as union state is somewhat
> > abused
> > > in several popular source connectors (e.g. kafka), and increasing this
> > > value could lead to JM OOM when sending tdd from JM to TMs with large
> > > parallelism.
> > >
> > > After we collect union state and initialize the map list [1], we
> already
> > > have union state ready to assign. At this time, the memory footprint
> has
> > > not increase too much as the union state which shared across tasks have
> > the
> > > same reference of ByteStreamStateHandle. However, when we send tdd with
> > the
> > > taskRestore to TMs, akka will serialize those ByteStreamStateHandle
> > within
> > > tdd to increases the memory footprint. If the source have 1024
> > > parallelisms, and any one of the sub-task would then have 1024*100KB
> size
> > > state handles. The sum of total memory footprint cannot be ignored.
> > >
> > > If we plan to increase the default value of
> > > state.backend.fs.memory-threshold, we should first resolve the above
> > case.
> > > In other words, this proposal could be a trade-off, which benefit
> perhaps
> > > 99% users, but might bring harmful effects to 1% user with large-scale
> > > flink jobs.
> > >
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> > >
> > > Best
> > > Yun Tang
> > >
> > >
> > > ________________________________
> > > From: Yu Li <ca...@gmail.com>
> > > Sent: Thursday, May 14, 2020 23:51
> > > To: Till Rohrmann <tr...@apache.org>
> > > Cc: dev <de...@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> from
> > > 1K to 100K
> > >
> > > TL;DR: I have some reservations but tend to be +1 for the proposal,
> > > meanwhile suggest we have a more thorough solution in the long run.
> > >
> > > Please correct me if I'm wrong, but it seems the root cause of the
> issue
> > is
> > > too many small files generated.
> > >
> > > I have some concerns for the case of session cluster [1], as well as
> > > possible issues for users at large scale, otherwise I think increasing
> > > `state.backend.fs.memory-threshold` to 100K is a good choice, based on
> > the
> > > assumption that a large portion of our users are running small jobs
> with
> > > small states.
> > >
> > > OTOH, maybe extending the solution [2] of resolving RocksDB small file
> > > problem (as proposed by FLINK-11937 [3]) to also support operator state
> > > could be an alternative? We have already applied the solution in
> > production
> > > for operator state and solved the HDFS NN RPC bottleneck problem on
> last
> > > year's Singles' day.
> > >
> > > Best Regards,
> > > Yu
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > > [2]
> > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > > <
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > > >
> > > [3] https://issues.apache.org/jira/browse/FLINK-11937
> > >
> > >
> > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org>
> > wrote:
> > >
> > > > I cannot say much about the concrete value but if our users have
> > problems
> > > > with the existing default values, then it makes sense to me to change
> > it.
> > > >
> > > > One thing to check could be whether it is possible to provide a
> > > meaningful
> > > > exception in case that the state size exceeds the frame size. At the
> > > > moment, Flink should fail with a message saying that a rpc message
> > > exceeds
> > > > the maximum frame size. Maybe it is also possible to point the user
> > > towards
> > > > "state.backend.fs.memory-threshold" if the message exceeds the frame
> > size
> > > > because of too much state.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org>
> wrote:
> > > >
> > > >> The parameter "state.backend.fs.memory-threshold" decides when a
> state
> > > >> will
> > > >> become a file and when it will be stored inline with the metadata
> (to
> > > >> avoid
> > > >> excessive amounts of small files).
> > > >>
> > > >> By default, this threshold is 1K - so every state above that size
> > > becomes
> > > >> a
> > > >> file. For many cases, this threshold seems to be too low.
> > > >> There is an interesting talk with background on this from Scott
> > Kidder:
> > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > > >>
> > > >> I wanted to discuss increasing this to 100K by default.
> > > >>
> > > >> Advantage:
> > > >>   - This should help many users out of the box, which otherwise see
> > > >> checkpointing problems on systems like S3, GCS, etc.
> > > >>
> > > >> Disadvantage:
> > > >>   - For very large jobs, this increases the required heap memory on
> > the
> > > JM
> > > >> side, because more state needs to be kept in-line when gathering the
> > > acks
> > > >> for a pending checkpoint.
> > > >>   - If tasks have a lot of states and each state is roughly at this
> > > >> threshold, we increase the chance of exceeding the RPC frame size
> and
> > > >> failing the job.
> > > >>
> > > >> What do you think?
> > > >>
> > > >> Best,
> > > >> Stephan
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Congxian Qiu <qc...@gmail.com>.
Hi,

Overall, I agree with increasing this value. but the default value set to
100K maybe something too large from my side.

I want to share some more information from my side.

The small files problem is indeed a problem many users may encounter in
production env. The states(Keyed state and Operator state) can become small
files in DFS, but increase the value of `state.backend.fs.memory-threshold`
may encounter the JM OOM problem as Yun said previously.
We've tried increase this value in our production env, but some connectors
which UnionState prevent us to do this, the memory consumed by these jobs
can be very large (in our case, thousands of parallelism, set
`state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory for
JM), so in the end, we use the solution proposed in FLINK-11937[1] for both
keyed state and operator state.


[1] https://issues.apache.org/jira/browse/FLINK-11937
Best,
Congxian


Yun Tang <my...@live.com> 于2020年5月15日周五 下午9:09写道:

> Please correct me if I am wrong, "put the increased value into the default
> configuration" means
> we will update that in default flink-conf.yaml but still leave the default
> value of `state.backend.fs.memory-threshold`as previously?
> It seems I did not get the point why existing setups with existing configs
> will not be affected.
>
> The concern I raised is because one of our large-scale job with 1024
> parallelism source of union state meet the JM OOM problem when we increase
> this value.
> I think if we introduce memory control when serializing TDD asynchronously
> [1], we could be much more confident to increase this configuration as the
> memory footprint
> expands at that time by a lot of serialized TDDs.
>
>
> [1]
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
>
> Best
> Yun Tang
>
> ________________________________
> From: Stephan Ewen <se...@apache.org>
> Sent: Friday, May 15, 2020 16:53
> To: dev <de...@flink.apache.org>
> Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <
> piotr@ververica.com>
> Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> 1K to 100K
>
> I see, thanks for all the input.
>
> I agree with Yun Tang that the use of UnionState is problematic and can
> cause issues in conjunction with this.
> However, most of the large-scale users I know that also struggle with
> UnionState have also increased this threshold, because with this low
> threshold, they get an excess number of small files and overwhelm their
> HDFS / S3 / etc.
>
> An intermediate solution could be to put the increased value into the
> default configuration. That way, existing setups with existing configs will
> not be affected, but new users / installations will have a simper time.
>
> Best,
> Stephan
>
>
> On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:
>
> > Tend to be not in favor of this proposal as union state is somewhat
> abused
> > in several popular source connectors (e.g. kafka), and increasing this
> > value could lead to JM OOM when sending tdd from JM to TMs with large
> > parallelism.
> >
> > After we collect union state and initialize the map list [1], we already
> > have union state ready to assign. At this time, the memory footprint has
> > not increase too much as the union state which shared across tasks have
> the
> > same reference of ByteStreamStateHandle. However, when we send tdd with
> the
> > taskRestore to TMs, akka will serialize those ByteStreamStateHandle
> within
> > tdd to increases the memory footprint. If the source have 1024
> > parallelisms, and any one of the sub-task would then have 1024*100KB size
> > state handles. The sum of total memory footprint cannot be ignored.
> >
> > If we plan to increase the default value of
> > state.backend.fs.memory-threshold, we should first resolve the above
> case.
> > In other words, this proposal could be a trade-off, which benefit perhaps
> > 99% users, but might bring harmful effects to 1% user with large-scale
> > flink jobs.
> >
> >
> > [1]
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> >
> > Best
> > Yun Tang
> >
> >
> > ________________________________
> > From: Yu Li <ca...@gmail.com>
> > Sent: Thursday, May 14, 2020 23:51
> > To: Till Rohrmann <tr...@apache.org>
> > Cc: dev <de...@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> > 1K to 100K
> >
> > TL;DR: I have some reservations but tend to be +1 for the proposal,
> > meanwhile suggest we have a more thorough solution in the long run.
> >
> > Please correct me if I'm wrong, but it seems the root cause of the issue
> is
> > too many small files generated.
> >
> > I have some concerns for the case of session cluster [1], as well as
> > possible issues for users at large scale, otherwise I think increasing
> > `state.backend.fs.memory-threshold` to 100K is a good choice, based on
> the
> > assumption that a large portion of our users are running small jobs with
> > small states.
> >
> > OTOH, maybe extending the solution [2] of resolving RocksDB small file
> > problem (as proposed by FLINK-11937 [3]) to also support operator state
> > could be an alternative? We have already applied the solution in
> production
> > for operator state and solved the HDFS NN RPC bottleneck problem on last
> > year's Singles' day.
> >
> > Best Regards,
> > Yu
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > [2]
> >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > <
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > >
> > [3] https://issues.apache.org/jira/browse/FLINK-11937
> >
> >
> > On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org>
> wrote:
> >
> > > I cannot say much about the concrete value but if our users have
> problems
> > > with the existing default values, then it makes sense to me to change
> it.
> > >
> > > One thing to check could be whether it is possible to provide a
> > meaningful
> > > exception in case that the state size exceeds the frame size. At the
> > > moment, Flink should fail with a message saying that a rpc message
> > exceeds
> > > the maximum frame size. Maybe it is also possible to point the user
> > towards
> > > "state.backend.fs.memory-threshold" if the message exceeds the frame
> size
> > > because of too much state.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
> > >
> > >> The parameter "state.backend.fs.memory-threshold" decides when a state
> > >> will
> > >> become a file and when it will be stored inline with the metadata (to
> > >> avoid
> > >> excessive amounts of small files).
> > >>
> > >> By default, this threshold is 1K - so every state above that size
> > becomes
> > >> a
> > >> file. For many cases, this threshold seems to be too low.
> > >> There is an interesting talk with background on this from Scott
> Kidder:
> > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > >>
> > >> I wanted to discuss increasing this to 100K by default.
> > >>
> > >> Advantage:
> > >>   - This should help many users out of the box, which otherwise see
> > >> checkpointing problems on systems like S3, GCS, etc.
> > >>
> > >> Disadvantage:
> > >>   - For very large jobs, this increases the required heap memory on
> the
> > JM
> > >> side, because more state needs to be kept in-line when gathering the
> > acks
> > >> for a pending checkpoint.
> > >>   - If tasks have a lot of states and each state is roughly at this
> > >> threshold, we increase the chance of exceeding the RPC frame size and
> > >> failing the job.
> > >>
> > >> What do you think?
> > >>
> > >> Best,
> > >> Stephan
> > >>
> > >
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Yun Tang <my...@live.com>.
Please correct me if I am wrong, "put the increased value into the default configuration" means
we will update that in default flink-conf.yaml but still leave the default value of `state.backend.fs.memory-threshold`as previously?
It seems I did not get the point why existing setups with existing configs will not be affected.

The concern I raised is because one of our large-scale job with 1024 parallelism source of union state meet the JM OOM problem when we increase this value.
I think if we introduce memory control when serializing TDD asynchronously [1], we could be much more confident to increase this configuration as the memory footprint
expands at that time by a lot of serialized TDDs.


[1] https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752

Best
Yun Tang

________________________________
From: Stephan Ewen <se...@apache.org>
Sent: Friday, May 15, 2020 16:53
To: dev <de...@flink.apache.org>
Cc: Till Rohrmann <tr...@apache.org>; Piotr Nowojski <pi...@ververica.com>
Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

I see, thanks for all the input.

I agree with Yun Tang that the use of UnionState is problematic and can
cause issues in conjunction with this.
However, most of the large-scale users I know that also struggle with
UnionState have also increased this threshold, because with this low
threshold, they get an excess number of small files and overwhelm their
HDFS / S3 / etc.

An intermediate solution could be to put the increased value into the
default configuration. That way, existing setups with existing configs will
not be affected, but new users / installations will have a simper time.

Best,
Stephan


On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:

> Tend to be not in favor of this proposal as union state is somewhat abused
> in several popular source connectors (e.g. kafka), and increasing this
> value could lead to JM OOM when sending tdd from JM to TMs with large
> parallelism.
>
> After we collect union state and initialize the map list [1], we already
> have union state ready to assign. At this time, the memory footprint has
> not increase too much as the union state which shared across tasks have the
> same reference of ByteStreamStateHandle. However, when we send tdd with the
> taskRestore to TMs, akka will serialize those ByteStreamStateHandle within
> tdd to increases the memory footprint. If the source have 1024
> parallelisms, and any one of the sub-task would then have 1024*100KB size
> state handles. The sum of total memory footprint cannot be ignored.
>
> If we plan to increase the default value of
> state.backend.fs.memory-threshold, we should first resolve the above case.
> In other words, this proposal could be a trade-off, which benefit perhaps
> 99% users, but might bring harmful effects to 1% user with large-scale
> flink jobs.
>
>
> [1]
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
>
> Best
> Yun Tang
>
>
> ________________________________
> From: Yu Li <ca...@gmail.com>
> Sent: Thursday, May 14, 2020 23:51
> To: Till Rohrmann <tr...@apache.org>
> Cc: dev <de...@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> 1K to 100K
>
> TL;DR: I have some reservations but tend to be +1 for the proposal,
> meanwhile suggest we have a more thorough solution in the long run.
>
> Please correct me if I'm wrong, but it seems the root cause of the issue is
> too many small files generated.
>
> I have some concerns for the case of session cluster [1], as well as
> possible issues for users at large scale, otherwise I think increasing
> `state.backend.fs.memory-threshold` to 100K is a good choice, based on the
> assumption that a large portion of our users are running small jobs with
> small states.
>
> OTOH, maybe extending the solution [2] of resolving RocksDB small file
> problem (as proposed by FLINK-11937 [3]) to also support operator state
> could be an alternative? We have already applied the solution in production
> for operator state and solved the HDFS NN RPC bottleneck problem on last
> year's Singles' day.
>
> Best Regards,
> Yu
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> [2]
>
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> <
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> >
> [3] https://issues.apache.org/jira/browse/FLINK-11937
>
>
> On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org> wrote:
>
> > I cannot say much about the concrete value but if our users have problems
> > with the existing default values, then it makes sense to me to change it.
> >
> > One thing to check could be whether it is possible to provide a
> meaningful
> > exception in case that the state size exceeds the frame size. At the
> > moment, Flink should fail with a message saying that a rpc message
> exceeds
> > the maximum frame size. Maybe it is also possible to point the user
> towards
> > "state.backend.fs.memory-threshold" if the message exceeds the frame size
> > because of too much state.
> >
> > Cheers,
> > Till
> >
> > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
> >
> >> The parameter "state.backend.fs.memory-threshold" decides when a state
> >> will
> >> become a file and when it will be stored inline with the metadata (to
> >> avoid
> >> excessive amounts of small files).
> >>
> >> By default, this threshold is 1K - so every state above that size
> becomes
> >> a
> >> file. For many cases, this threshold seems to be too low.
> >> There is an interesting talk with background on this from Scott Kidder:
> >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> >>
> >> I wanted to discuss increasing this to 100K by default.
> >>
> >> Advantage:
> >>   - This should help many users out of the box, which otherwise see
> >> checkpointing problems on systems like S3, GCS, etc.
> >>
> >> Disadvantage:
> >>   - For very large jobs, this increases the required heap memory on the
> JM
> >> side, because more state needs to be kept in-line when gathering the
> acks
> >> for a pending checkpoint.
> >>   - If tasks have a lot of states and each state is roughly at this
> >> threshold, we increase the chance of exceeding the RPC frame size and
> >> failing the job.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Stephan
> >>
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Stephan Ewen <se...@apache.org>.
I see, thanks for all the input.

I agree with Yun Tang that the use of UnionState is problematic and can
cause issues in conjunction with this.
However, most of the large-scale users I know that also struggle with
UnionState have also increased this threshold, because with this low
threshold, they get an excess number of small files and overwhelm their
HDFS / S3 / etc.

An intermediate solution could be to put the increased value into the
default configuration. That way, existing setups with existing configs will
not be affected, but new users / installations will have a simper time.

Best,
Stephan


On Thu, May 14, 2020 at 9:20 PM Yun Tang <my...@live.com> wrote:

> Tend to be not in favor of this proposal as union state is somewhat abused
> in several popular source connectors (e.g. kafka), and increasing this
> value could lead to JM OOM when sending tdd from JM to TMs with large
> parallelism.
>
> After we collect union state and initialize the map list [1], we already
> have union state ready to assign. At this time, the memory footprint has
> not increase too much as the union state which shared across tasks have the
> same reference of ByteStreamStateHandle. However, when we send tdd with the
> taskRestore to TMs, akka will serialize those ByteStreamStateHandle within
> tdd to increases the memory footprint. If the source have 1024
> parallelisms, and any one of the sub-task would then have 1024*100KB size
> state handles. The sum of total memory footprint cannot be ignored.
>
> If we plan to increase the default value of
> state.backend.fs.memory-threshold, we should first resolve the above case.
> In other words, this proposal could be a trade-off, which benefit perhaps
> 99% users, but might bring harmful effects to 1% user with large-scale
> flink jobs.
>
>
> [1]
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
>
> Best
> Yun Tang
>
>
> ________________________________
> From: Yu Li <ca...@gmail.com>
> Sent: Thursday, May 14, 2020 23:51
> To: Till Rohrmann <tr...@apache.org>
> Cc: dev <de...@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> 1K to 100K
>
> TL;DR: I have some reservations but tend to be +1 for the proposal,
> meanwhile suggest we have a more thorough solution in the long run.
>
> Please correct me if I'm wrong, but it seems the root cause of the issue is
> too many small files generated.
>
> I have some concerns for the case of session cluster [1], as well as
> possible issues for users at large scale, otherwise I think increasing
> `state.backend.fs.memory-threshold` to 100K is a good choice, based on the
> assumption that a large portion of our users are running small jobs with
> small states.
>
> OTOH, maybe extending the solution [2] of resolving RocksDB small file
> problem (as proposed by FLINK-11937 [3]) to also support operator state
> could be an alternative? We have already applied the solution in production
> for operator state and solved the HDFS NN RPC bottleneck problem on last
> year's Singles' day.
>
> Best Regards,
> Yu
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> [2]
>
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> <
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> >
> [3] https://issues.apache.org/jira/browse/FLINK-11937
>
>
> On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org> wrote:
>
> > I cannot say much about the concrete value but if our users have problems
> > with the existing default values, then it makes sense to me to change it.
> >
> > One thing to check could be whether it is possible to provide a
> meaningful
> > exception in case that the state size exceeds the frame size. At the
> > moment, Flink should fail with a message saying that a rpc message
> exceeds
> > the maximum frame size. Maybe it is also possible to point the user
> towards
> > "state.backend.fs.memory-threshold" if the message exceeds the frame size
> > because of too much state.
> >
> > Cheers,
> > Till
> >
> > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
> >
> >> The parameter "state.backend.fs.memory-threshold" decides when a state
> >> will
> >> become a file and when it will be stored inline with the metadata (to
> >> avoid
> >> excessive amounts of small files).
> >>
> >> By default, this threshold is 1K - so every state above that size
> becomes
> >> a
> >> file. For many cases, this threshold seems to be too low.
> >> There is an interesting talk with background on this from Scott Kidder:
> >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> >>
> >> I wanted to discuss increasing this to 100K by default.
> >>
> >> Advantage:
> >>   - This should help many users out of the box, which otherwise see
> >> checkpointing problems on systems like S3, GCS, etc.
> >>
> >> Disadvantage:
> >>   - For very large jobs, this increases the required heap memory on the
> JM
> >> side, because more state needs to be kept in-line when gathering the
> acks
> >> for a pending checkpoint.
> >>   - If tasks have a lot of states and each state is roughly at this
> >> threshold, we increase the chance of exceeding the RPC frame size and
> >> failing the job.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Stephan
> >>
> >
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Yun Tang <my...@live.com>.
Tend to be not in favor of this proposal as union state is somewhat abused in several popular source connectors (e.g. kafka), and increasing this value could lead to JM OOM when sending tdd from JM to TMs with large parallelism.

After we collect union state and initialize the map list [1], we already have union state ready to assign. At this time, the memory footprint has not increase too much as the union state which shared across tasks have the same reference of ByteStreamStateHandle. However, when we send tdd with the taskRestore to TMs, akka will serialize those ByteStreamStateHandle within tdd to increases the memory footprint. If the source have 1024 parallelisms, and any one of the sub-task would then have 1024*100KB size state handles. The sum of total memory footprint cannot be ignored.

If we plan to increase the default value of state.backend.fs.memory-threshold, we should first resolve the above case.
In other words, this proposal could be a trade-off, which benefit perhaps 99% users, but might bring harmful effects to 1% user with large-scale flink jobs.


[1] https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87

Best
Yun Tang


________________________________
From: Yu Li <ca...@gmail.com>
Sent: Thursday, May 14, 2020 23:51
To: Till Rohrmann <tr...@apache.org>
Cc: dev <de...@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

TL;DR: I have some reservations but tend to be +1 for the proposal,
meanwhile suggest we have a more thorough solution in the long run.

Please correct me if I'm wrong, but it seems the root cause of the issue is
too many small files generated.

I have some concerns for the case of session cluster [1], as well as
possible issues for users at large scale, otherwise I think increasing
`state.backend.fs.memory-threshold` to 100K is a good choice, based on the
assumption that a large portion of our users are running small jobs with
small states.

OTOH, maybe extending the solution [2] of resolving RocksDB small file
problem (as proposed by FLINK-11937 [3]) to also support operator state
could be an alternative? We have already applied the solution in production
for operator state and solved the HDFS NN RPC bottleneck problem on last
year's Singles' day.

Best Regards,
Yu

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
[2]
https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
<https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h>
[3] https://issues.apache.org/jira/browse/FLINK-11937


On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org> wrote:

> I cannot say much about the concrete value but if our users have problems
> with the existing default values, then it makes sense to me to change it.
>
> One thing to check could be whether it is possible to provide a meaningful
> exception in case that the state size exceeds the frame size. At the
> moment, Flink should fail with a message saying that a rpc message exceeds
> the maximum frame size. Maybe it is also possible to point the user towards
> "state.backend.fs.memory-threshold" if the message exceeds the frame size
> because of too much state.
>
> Cheers,
> Till
>
> On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
>
>> The parameter "state.backend.fs.memory-threshold" decides when a state
>> will
>> become a file and when it will be stored inline with the metadata (to
>> avoid
>> excessive amounts of small files).
>>
>> By default, this threshold is 1K - so every state above that size becomes
>> a
>> file. For many cases, this threshold seems to be too low.
>> There is an interesting talk with background on this from Scott Kidder:
>> https://www.youtube.com/watch?v=gycq0cY3TZ0
>>
>> I wanted to discuss increasing this to 100K by default.
>>
>> Advantage:
>>   - This should help many users out of the box, which otherwise see
>> checkpointing problems on systems like S3, GCS, etc.
>>
>> Disadvantage:
>>   - For very large jobs, this increases the required heap memory on the JM
>> side, because more state needs to be kept in-line when gathering the acks
>> for a pending checkpoint.
>>   - If tasks have a lot of states and each state is roughly at this
>> threshold, we increase the chance of exceeding the RPC frame size and
>> failing the job.
>>
>> What do you think?
>>
>> Best,
>> Stephan
>>
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Yu Li <ca...@gmail.com>.
TL;DR: I have some reservations but tend to be +1 for the proposal,
meanwhile suggest we have a more thorough solution in the long run.

Please correct me if I'm wrong, but it seems the root cause of the issue is
too many small files generated.

I have some concerns for the case of session cluster [1], as well as
possible issues for users at large scale, otherwise I think increasing
`state.backend.fs.memory-threshold` to 100K is a good choice, based on the
assumption that a large portion of our users are running small jobs with
small states.

OTOH, maybe extending the solution [2] of resolving RocksDB small file
problem (as proposed by FLINK-11937 [3]) to also support operator state
could be an alternative? We have already applied the solution in production
for operator state and solved the HDFS NN RPC bottleneck problem on last
year's Singles' day.

Best Regards,
Yu

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
[2]
https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
<https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h>
[3] https://issues.apache.org/jira/browse/FLINK-11937


On Thu, 14 May 2020 at 21:45, Till Rohrmann <tr...@apache.org> wrote:

> I cannot say much about the concrete value but if our users have problems
> with the existing default values, then it makes sense to me to change it.
>
> One thing to check could be whether it is possible to provide a meaningful
> exception in case that the state size exceeds the frame size. At the
> moment, Flink should fail with a message saying that a rpc message exceeds
> the maximum frame size. Maybe it is also possible to point the user towards
> "state.backend.fs.memory-threshold" if the message exceeds the frame size
> because of too much state.
>
> Cheers,
> Till
>
> On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
>
>> The parameter "state.backend.fs.memory-threshold" decides when a state
>> will
>> become a file and when it will be stored inline with the metadata (to
>> avoid
>> excessive amounts of small files).
>>
>> By default, this threshold is 1K - so every state above that size becomes
>> a
>> file. For many cases, this threshold seems to be too low.
>> There is an interesting talk with background on this from Scott Kidder:
>> https://www.youtube.com/watch?v=gycq0cY3TZ0
>>
>> I wanted to discuss increasing this to 100K by default.
>>
>> Advantage:
>>   - This should help many users out of the box, which otherwise see
>> checkpointing problems on systems like S3, GCS, etc.
>>
>> Disadvantage:
>>   - For very large jobs, this increases the required heap memory on the JM
>> side, because more state needs to be kept in-line when gathering the acks
>> for a pending checkpoint.
>>   - If tasks have a lot of states and each state is roughly at this
>> threshold, we increase the chance of exceeding the RPC frame size and
>> failing the job.
>>
>> What do you think?
>>
>> Best,
>> Stephan
>>
>

Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 100K

Posted by Till Rohrmann <tr...@apache.org>.
I cannot say much about the concrete value but if our users have problems
with the existing default values, then it makes sense to me to change it.

One thing to check could be whether it is possible to provide a meaningful
exception in case that the state size exceeds the frame size. At the
moment, Flink should fail with a message saying that a rpc message exceeds
the maximum frame size. Maybe it is also possible to point the user towards
"state.backend.fs.memory-threshold" if the message exceeds the frame size
because of too much state.

Cheers,
Till

On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:

> The parameter "state.backend.fs.memory-threshold" decides when a state will
> become a file and when it will be stored inline with the metadata (to avoid
> excessive amounts of small files).
>
> By default, this threshold is 1K - so every state above that size becomes a
> file. For many cases, this threshold seems to be too low.
> There is an interesting talk with background on this from Scott Kidder:
> https://www.youtube.com/watch?v=gycq0cY3TZ0
>
> I wanted to discuss increasing this to 100K by default.
>
> Advantage:
>   - This should help many users out of the box, which otherwise see
> checkpointing problems on systems like S3, GCS, etc.
>
> Disadvantage:
>   - For very large jobs, this increases the required heap memory on the JM
> side, because more state needs to be kept in-line when gathering the acks
> for a pending checkpoint.
>   - If tasks have a lot of states and each state is roughly at this
> threshold, we increase the chance of exceeding the RPC frame size and
> failing the job.
>
> What do you think?
>
> Best,
> Stephan
>