You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pawel Bartoszek <pa...@gmail.com> on 2018/10/24 15:11:07 UTC

Flink weird checkpointing behaviour

Hi,

We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have
noticed that some checkpoints are taking a very long time to complete some
of them event fails with exception

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].


We have noticed that *Checkpoint Duration (Async) *is taking most of
checkpoint time compared to *Checkpoint Duration (Sync). *I thought that
Async checkpoints are only offered by RocksDB backend state. We use
filesystem state.

We didn't have such problems on Flink 1.3.2

Thanks,
Pawel

*Flink configuration*

akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum
ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs
/mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://....
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true

Re: Flink weird checkpointing behaviour

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

I think it is definitely worth checking the alignment time as Yun Tang
suggested. There were some changes in the network stack that could
influence this behavior between those version.


I've also added Stefan as cc, who might have more ideas what would be
worth checking.

Best,

Dawid


On 31/10/2018 16:51, Yun Tang wrote:
> Hi Pawel
>
> First of all, I don't think the akka timeout exception has
> relationship with checkpoint taking long time. And both
> RocksDBStateBackend and FsStateBackend could have the async part of
> checkpoint, which would upload data to DFS in general. That's why
> async part would take more time than sync part of checkpoint in most
> cases.
>
> You could try to notice whether the checkpoint alignment time is much
> longer than before, back pressure of a job would cause tasks in
> downstream received checkpoint barrier later and tasks must receive
> all barriers from its inputs to trigger checkpoint [1]. If the long
> checkpoint alignment time mainly impact the overall checkpoint
> duration, you should check the tasks which cause back pressure.
>
> Also, the long time of checkpoint might also be caused by the low
> write performance of DFS.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers
> <https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers>
> 	
> Apache Flink 1.6 Documentation: Data Streaming Fault Tolerance
> <https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers>
> Apache Flink offers a fault tolerance mechanism to consistently
> recover the state of data streaming applications. The mechanism
> ensures that even in the presence of failures, the program’s state
> will eventually reflect every record from the data stream exactly
> once. Note that there is a switch to ...
> ci.apache.org
>
> Best
> Yun Tang
>
> ------------------------------------------------------------------------
> *From:* Pawel Bartoszek <pa...@gmail.com>
> *Sent:* Wednesday, October 24, 2018 23:11
> *To:* User
> *Subject:* Flink weird checkpointing behaviour
>  
> Hi,
>
> We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have
> noticed that some checkpoints are taking a very long time to complete
> some of them event fails with exception
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].
>
> We have noticed that /Checkpoint Duration (Async) /is taking most of
> checkpoint time compared to /Checkpoint Duration (Sync). /I thought
> that Async checkpoints are only offered by RocksDB backend state. We
> use filesystem state.
>
> We didn't have such problems on Flink 1.3.2
>
> Thanks,
> Pawel
>
> *Flink configuration*
>  
> akka.ask.timeout60 s
> classloader.resolve-orderparent-first
> containerized.heap-cutoff-ratio0.15
> env.hadoop.conf.dir/etc/hadoop/conf
> env.yarn.conf.dir/etc/hadoop/conf
> high-availabilityzookeeper
> high-availability.cluster-idapplication_1540292869184_0001
> high-availability.zookeeper.path.root/flink
> high-availability.zookeeper.quorumip-10-4-X-X.eu-west-1.compute.internal:2181
> high-availability.zookeeper.storageDirhdfs:///flink/recovery
> internal.cluster.execution-modeNORMAL
> internal.io.tmpdirs.use-local-defaulttrue
> io.tmp.dirs/mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
> jobmanager.heap.mb3072
> jobmanager.rpc.addressip-10-4-X-X.eu-west-1.compute.internal
> jobmanager.rpc.port41219
> jobmanager.web.checkpoints.history1000
> parallelism.default32
> rest.addressip-10-4-X-X.eu-west-1.compute.internal
> rest.port0
> state.backendfilesystem
> state.backend.fs.checkpointdirs3a://....
> state.checkpoints.dirs3a://...
> state.savepoints.dirs3a://...
> taskmanager.heap.mb6600
> taskmanager.numberOfTaskSlots1
> web.port0
> web.tmpdir/tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
> yarn.application-attempts10
> yarn.maximum-failed-containers-1
> zookeeper.sasl.disabletrue

Re: Flink weird checkpointing behaviour

Posted by Yun Tang <my...@live.com>.
Hi Pawel

First of all, I don't think the akka timeout exception has relationship with checkpoint taking long time. And both RocksDBStateBackend and FsStateBackend could have the async part of checkpoint, which would upload data to DFS in general. That's why async part would take more time than sync part of checkpoint in most cases.

You could try to notice whether the checkpoint alignment time is much longer than before, back pressure of a job would cause tasks in downstream received checkpoint barrier later and tasks must receive all barriers from its inputs to trigger checkpoint [1]. If the long checkpoint alignment time mainly impact the overall checkpoint duration, you should check the tasks which cause back pressure.

Also, the long time of checkpoint might also be caused by the low write performance of DFS.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/fig/stream_barriers.svg]<https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers>

Apache Flink 1.6 Documentation: Data Streaming Fault Tolerance<https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers>
Apache Flink offers a fault tolerance mechanism to consistently recover the state of data streaming applications. The mechanism ensures that even in the presence of failures, the program’s state will eventually reflect every record from the data stream exactly once. Note that there is a switch to ...
ci.apache.org

Best
Yun Tang

________________________________
From: Pawel Bartoszek <pa...@gmail.com>
Sent: Wednesday, October 24, 2018 23:11
To: User
Subject: Flink weird checkpointing behaviour

Hi,

We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have noticed that some checkpoints are taking a very long time to complete some of them event fails with exception

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].

We have noticed that Checkpoint Duration (Async) is taking most of checkpoint time compared to Checkpoint Duration (Sync). I thought that Async checkpoints are only offered by RocksDB backend state. We use filesystem state.

We didn't have such problems on Flink 1.3.2

Thanks,
Pawel

Flink configuration

akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs /mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://....
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true