You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Colletta, Edward" <Ed...@FMR.COM> on 2020/12/21 09:49:50 UTC

checkpointing seems to be throttled.

Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The checkpoint timeout is the default 10 minutes.   This does not seem to be related to EFS limits/throttling .  We started experiencing these timeouts after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout>: 60 min

execution.checkpointing.tolerable-failed-checkpoints<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-tolerable-failed-checkpoints>:12

execution.checkpointing.unaligned<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-unaligned>  true
and also explicitly set
state.checkpoints.dir


RE: RE: checkpointing seems to be throttled.

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
FYI, this was an EFS issue.  I originally dismissed EFS being the issue because the Percent I/O limit metric  was very low.  But I later noticed the throughput utilization was very high.  We increased the provisioned throughput and the checkpoint times are greatly reduced.

From: Colletta, Edward
Sent: Monday, December 21, 2020 12:32 PM
To: Yun Gao <yu...@aliyun.com>; user@flink.apache.org
Subject: RE: RE: checkpointing seems to be throttled.

Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml file and use the high-availability storage dir.


From: Yun Gao <yu...@aliyun.com>>
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward <Ed...@FMR.COM>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code it requires a path parameter and the path would be the state.checkpoint.dir. If via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
    ​ at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
    ​ at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
    ​ at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    ​ at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
    ​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
    ​ at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
    ​ at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
    ​ at java.security.AccessController.doPrivileged(Native Method)
    ​ at javax.security.auth.Subject.doAs(Subject.java:422)
    ​ at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    ​ at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    ​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
    ​ at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
    ​ at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:237)
    ​ at org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
    ​ at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
    ​ at java.util.Optional.map(Optional.java:215)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
    ​ at CheckpointTest.main(CheckpointTest.java:26)
    ​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    ​ at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    ​ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    ​ at java.lang.reflect.Method.invoke(Method.java:498)
    ​ at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
    ​ ... 11 more
    ​
    ​
   For the timeout, if there are no backpressure, I think it might be helpful to see the time decompostion for the checkpoint in the checkpoint history page in WEB UI to see which phase takes too long time.


Best,
 Yun


------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao <yu...@aliyun.com>>, user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject:RE: checkpointing seems to be throttled.
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao <yu...@aliyun.com>>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward <Ed...@FMR.COM>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: checkpointing seems to be throttled.

This email is from an external source -exercise caution regarding links and attachments.

Hi Edward,

    For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env then it might better to switch to statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 1.9.2. There may be different issues for checkpoint timeout, and one possible one might be there are back-pressure due to some operator could not process its records  in time, which would block the checkpoints. I think you might check the back-pressure [1] first, and if there is indeed back pressure, then you might try unaligned checkpoints or solve the back pressure by increasing the parallelism of slow operators.

Best,
 Yun



[1]https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html



------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org <us...@flink.apache.org>>
Subject:checkpointing seems to be throttled.
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The checkpoint timeout is the default 10 minutes.   This does not seem to be related to EFS limits/throttling .  We started experiencing these timeouts after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is usinghigh-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout>: 60 min

execution.checkpointing.tolerable-failed-checkpoints<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-tolerable-failed-checkpoints>:12

execution.checkpointing.unaligned<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-unaligned>  true
and also explicitly set
state.checkpoints.dir


RE: RE: checkpointing seems to be throttled.

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml file and use the high-availability storage dir.


From: Yun Gao <yu...@aliyun.com>
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward <Ed...@FMR.COM>; user@flink.apache.org
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code it requires a path parameter and the path would be the state.checkpoint.dir. If via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
    ​ at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
    ​ at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
    ​ at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    ​ at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
    ​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
    ​ at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
    ​ at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
    ​ at java.security.AccessController.doPrivileged(Native Method)
    ​ at javax.security.auth.Subject.doAs(Subject.java:422)
    ​ at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    ​ at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    ​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
    ​ at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
    ​ at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:237)
    ​ at org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
    ​ at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
    ​ at java.util.Optional.map(Optional.java:215)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
    ​ at CheckpointTest.main(CheckpointTest.java:26)
    ​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    ​ at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    ​ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    ​ at java.lang.reflect.Method.invoke(Method.java:498)
    ​ at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
    ​ ... 11 more
    ​
    ​
   For the timeout, if there are no backpressure, I think it might be helpful to see the time decompostion for the checkpoint in the checkpoint history page in WEB UI to see which phase takes too long time.


Best,
 Yun


------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao <yu...@aliyun.com>>, user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject:RE: checkpointing seems to be throttled.
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao <yu...@aliyun.com>>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward <Ed...@FMR.COM>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: checkpointing seems to be throttled.

This email is from an external source -exercise caution regarding links and attachments.

Hi Edward,

    For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env then it might better to switch to statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 1.9.2. There may be different issues for checkpoint timeout, and one possible one might be there are back-pressure due to some operator could not process its records  in time, which would block the checkpoints. I think you might check the back-pressure [1] first, and if there is indeed back pressure, then you might try unaligned checkpoints or solve the back pressure by increasing the parallelism of slow operators.

Best,
 Yun



[1]https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html



------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org <us...@flink.apache.org>>
Subject:checkpointing seems to be throttled.
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The checkpoint timeout is the default 10 minutes.   This does not seem to be related to EFS limits/throttling .  We started experiencing these timeouts after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is usinghigh-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout>: 60 min

execution.checkpointing.tolerable-failed-checkpoints<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-tolerable-failed-checkpoints>:12

execution.checkpointing.unaligned<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-unaligned>  true
and also explicitly set
state.checkpoints.dir


Re: RE: checkpointing seems to be throttled.

Posted by Yun Gao <yu...@aliyun.com>.
Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code it requires a path parameter and the path would be the state.checkpoint.dir. If via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in config file and enable checkpoint, it indeed threw an exception said 

  org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
    ​ at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
    ​ at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
    ​ at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    ​ at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
    ​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
    ​ at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
    ​ at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
    ​ at java.security.AccessController.doPrivileged(Native Method)
    ​ at javax.security.auth.Subject.doAs(Subject.java:422)
    ​ at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    ​ at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    ​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
    ​ at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
    ​ at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:237)
    ​ at org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
    ​ at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
    ​ at java.util.Optional.map(Optional.java:215)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
    ​ at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
    ​ at CheckpointTest.main(CheckpointTest.java:26)
    ​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    ​ at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    ​ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    ​ at java.lang.reflect.Method.invoke(Method.java:498)
    ​ at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
    ​ ... 11 more
    ​
    ​
   For the timeout, if there are no backpressure, I think it might be helpful to see the time decompostion for the checkpoint in the checkpoint history page in WEB UI to see which phase takes too long time.


Best,
 Yun



 ------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao <yu...@aliyun.com>, user@flink.apache.org <us...@flink.apache.org>
Subject:RE: checkpointing seems to be throttled.

Thanks for the quick response.
We are using FsStateBackend, and I did see checkpoint files and directories in the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any. 
From: Yun Gao <yu...@aliyun.com> 
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward <Ed...@FMR.COM>; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.
This email is from an external source -exercise caution regarding links and attachments.
Hi Edward,

    For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env then it might better to switch to statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 1.9.2. There may be different issues for checkpoint timeout, and one possible one might be there are back-pressure due to some operator could not process its records  in time, which would block the checkpoints. I think you might check the back-pressure [1] first, and if there is indeed back pressure, then you might try unaligned checkpoints or solve the back pressure by increasing the parallelism of slow operators. 

Best,
 Yun



[1]https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html


 
------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org <us...@flink.apache.org>
Subject:checkpointing seems to be throttled.

Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots is set to true.  13 jobs running.  Average parallelism of each job is 4.                                                                                                                                                      
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The checkpoint timeout is the default 10 minutes.   This does not seem to be related to EFS limits/throttling .  We started experiencing these timeouts after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is usinghigh-availability.storageDir.  Is that correct?

For now we plan on setting 
execution.checkpointing.timeout: 60 min
execution.checkpointing.tolerable-failed-checkpoints:12execution.checkpointing.unaligned  trueand also explicitly setstate.checkpoints.dir
 

RE: checkpointing seems to be throttled.

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao <yu...@aliyun.com>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward <Ed...@FMR.COM>; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and attachments.

Hi Edward,

    For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env then it might better to switch to statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 1.9.2. There may be different issues for checkpoint timeout, and one possible one might be there are back-pressure due to some operator could not process its records  in time, which would block the checkpoints. I think you might check the back-pressure [1] first, and if there is indeed back pressure, then you might try unaligned checkpoints or solve the back pressure by increasing the parallelism of slow operators.

Best,
 Yun



[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html



------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org <us...@flink.apache.org>>
Subject:checkpointing seems to be throttled.
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The checkpoint timeout is the default 10 minutes.   This does not seem to be related to EFS limits/throttling .  We started experiencing these timeouts after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout>: 60 min

execution.checkpointing.tolerable-failed-checkpoints<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-tolerable-failed-checkpoints>:12

execution.checkpointing.unaligned<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-unaligned>  true
and also explicitly set
state.checkpoints.dir


Re: checkpointing seems to be throttled.

Posted by Yun Gao <yu...@aliyun.com>.
Hi Edward,

    For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env then it might better to switch to statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 1.9.2. There may be different issues for checkpoint timeout, and one possible one might be there are back-pressure due to some operator could not process its records  in time, which would block the checkpoints. I think you might check the back-pressure [1] first, and if there is indeed back pressure, then you might try unaligned checkpoints or solve the back pressure by increasing the parallelism of slow operators. 

Best,
 Yun



[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html 



 ------------------Original Mail ------------------
Sender:Colletta, Edward <Ed...@FMR.COM>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org <us...@flink.apache.org>
Subject:checkpointing seems to be throttled.

Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots is set to true.  13 jobs running.  Average parallelism of each job is 4.                                                                                                                                                      
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.
We are seeing very high checkpoint times and experiencing timeouts.  The checkpoint timeout is the default 10 minutes.   This does not seem to be related to EFS limits/throttling .  We started experiencing these timeouts after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which cause very high checkpoint times?
Also I noticed we did not set state.checkpoints.dir, I assume it is using high-availability.storageDir.  Is that correct?
For now we plan on setting 
execution.checkpointing.timeout: 60 min
execution.checkpointing.tolerable-failed-checkpoints:12execution.checkpointing.unaligned  trueand also explicitly setstate.checkpoints.dir