You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Daniel Vol <vo...@gmail.com> on 2021/09/01 11:07:31 UTC

Flink restarts on Checkpoint failure

Hello,

I see the following error in my jobmanager log (Flink on EMR):
Checking cluster logs I see :
2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
- close closed:false s3://***/_metadata
2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
3496 ms).
2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
- close closed:false s3://***/_metadata
2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
15856 ms).
2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
RUNNING to RESTARTING.

Configuration is:

-yD "execution.checkpointing.timeout=10 min"\
-yD "restart-strategy=failure-rate"\
-yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
-yD "restart-strategy.failure-rate.delay=1 min"\
-yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\

Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is
related - but it looks like it is solved.

I know I can increase checkpoint timeout - but checkpoint size is
relatively small and most of the time it takes several seconds to
complete so 10 minutes should be more than enough. So the main
question is why "Exceeded checkpoint tolerable failure threshold"
triggered?

Thanks!

Re: Flink restarts on Checkpoint failure

Posted by Daniel Vol <vo...@gmail.com>.
You are right.
After letting it run few hours I see from Prometheus and Kinesis metrics
(Flink read from Kinesis) that we have constantly grown up iterator on
"millisBehindLatest"
So this creates the backlog and backpressure.
Now I am trying to understand why Flink is not able to read at the correct
rate as the cluster doesn't show any load (CPU under 20% and memory
somewhere near 50%), but this is already ot of your scope :)

Thanks for all your help..
Daniel.

On Tue, Sep 14, 2021 at 10:19 AM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Hi Daniel,
>
>
>
> I was not able to follow on … back from vacation now 😊
>
>
>
> … just as a wrap up, you found a workable solution:
>
>    - The checkpoint page shows, that those checkpoints that failed
>    actually ran into the checkpoint timeout (10m) due to backpressure
>    preventing checkpoint mark progress
>    - As suggested, enabling unaligned checkpoints prevents this
>    - However be aware that unaligned checkpoints imply slight changes in
>    semantics (if I remember right this is mentioned with the documentation for
>    unaligned checkpoints)
>
>
>
> … as to the data skew or further improvements:
>
>    - Your data skew does not look too bad, it actually depends on the
>    distribution of your keys and how much entropy they encounter
>    - You probably can better tell about the distribution …
>    - If you actually know the distribution and have a way to predict the
>    likelihood of single keys you could evenly distribute them by means of a
>    custom partitioner (depending on the statistical properties of the key
>    space)
>    - Before going into this I would try to improve the backlog situation
>    (making the bottleneck operator faster, if possible)
>    - Backlog would be expected in backfill phase but after backfill cost
>    you latency
>
>
>
> Thias
>
>
>
> *From:* Daniel Vol <vo...@gmail.com>
> *Sent:* Montag, 13. September 2021 14:53
> *To:* Caizhi Weng <ts...@gmail.com>
> *Cc:* Schwalbe Matthias <Ma...@viseca.ch>; user <
> user@flink.apache.org>
> *Subject:* Re: Flink restarts on Checkpoint failure
>
>
>
> You are right.
>
> I see very big difference - one of subtasks created almost 2M records
> where some are 0:
>
> \
>
> as they are sorted - last 120! subtasks so nothing...
>
>
>
> On Mon, Sep 13, 2021 at 3:43 PM Caizhi Weng <ts...@gmail.com> wrote:
>
> Hi!
>
>
>
> I see execution.checkpointing.unaligned is working for you. This indicates
> that checkpoints are very likely to be backpressured.
>
>
>
> To see if there is really a data skew, you can go into Flink's web UI,
> click on a node and see the number of records going into and out of a
> subtask. If there is a subtask which handles much more data than others
> then there is a data skew.
>
>
>
> Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午8:39写道:
>
> Thanks!
>
> Yes, I already found this parameter
> (execution.checkpointing.tolerable-failed-checkpoints) as well added:
>
> -yD "execution.checkpointing.unaligned=true"\
> -yD "execution.checkpointing.max-concurrent-checkpoints=1"\
>
> Since then a process running much better and till now 2.5h and completed 13 checkpoints with no failures.
>
> I will give it a few more hours to run to see the behaviour.
>
> Regarding data skew - it is possible that we have some keys larger than others and get a pretty big skew.
>
> But I don't saw backpressure - though I might miss something.
>
> Daniel.
>
>
>
> On Mon, Sep 13, 2021 at 3:23 PM Caizhi Weng <ts...@gmail.com> wrote:
>
> Hi!
>
>
>
> You can set execution.checkpointing.tolerable-failed-checkpoints to
> tolerate a certain number of checkpoint failures. See [1].
>
>
>
> The GC metrics are increasing because they are the total number of GCs
> happen so far. So it doesn't seem to be relevant.
>
>
>
> From your image all but one subtask have finished their checkpoint. So it
> is likely that there is a data skew and it backpressures the checkpoint. It
> seems that there is a window aggregation in your job. Is there any data
> skew on the aggregation keys?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints
>
>
>
> Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午6:00写道:
>
> Hey,
>
>
>
> Sorry it took some time to answer.
>
> First - Thanks for your input.
>
>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>    threshold: this depends on what recovery strategy you have configured …
>
> Flink Configuration:
>
> -yD "env.java.opts.taskmanager=-XX:+UseG1GC"\
> -yD "env.java.opts.jobmanager=-XX:+UseG1GC"\
> -yD "state.backend=rocksdb"\
> -yD "state.checkpoints.dir=s3://..."\
> -yD "state.checkpoints.num-retained=2"\
> -yD "state.savepoints.dir=s3://..."\
> -yD "state.backend.incremental=true"\
> -yD "state.backend.local-recovery=true"\
> -yD "state.backend.async=true"\
> -yD "metrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter"\
> -yD "taskmanager.memory.jvm-metaspace.size=1 gb"\
> -yD "flink.partition-discovery.interval-millis=60000"\
> -yD "pipeline.time-characteristic=EventTime"\
> -yD "pipeline.generic-types=true"\
> -yD "execution.checkpointing.interval=10 min"\
> -yD "execution.checkpointing.min-pause=5 min"\
> -yD "execution.checkpointing.mode=AT_LEAST_ONCE"\
> -yD "execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION"\
> -yD "execution.checkpointing.timeout=10 min"\
> -yD "restart-strategy=failure-rate"\
> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
> -yD "restart-strategy.failure-rate.delay=1 min"\
> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
> -yD "akka.ask.timeout=1 min"\
> -yD "yarn.application-attempts=1"\
> -yD "heartbeat.timeout=120000"\
>
> Yarn configuration:
>
> According to flink config (70 failures during 1 hour if I understand it correctly) each CheckPoint failure should not seem to cause restart on application...
>
> but it is:
>
>
>  EMR - 17 R5.4XL machines with flink yjm=120g ytm=120g
>
> Checkpoint failure by itself does not really bother me - but restarting the flink due to *each* CheckPoint failure - very!
>
> I have tried to move to Flink 1.12.1 (next EMR version - 5.33) from Flink 1.11.2 (EMR - 5.32) but have the same issue.
>
> Regarding Cluster usage (EMR 5.33 with Flink 1.12.1):
>
> I see a CPU peak on TM at time of failure and restart (JM is ~0% usage all the time)-
>
>
> while GC looks like increasing all the time
>
> All those metrics come from Flink itself -> Prometheus -> Grafana.
>
> Flink dashboard 1.12.1:
>
>
>
> Whould be more than happy for any help to move on.
>
>
>
> Thank you in advance,
>
>
>
> Daniel.
>
>
>
> On Thu, Sep 2, 2021 at 9:49 AM Schwalbe Matthias <
> Matthias.Schwalbe@viseca.ch> wrote:
>
> Good morning Daniel,
>
>
>
> Another reason could be backpressure with aligned checkpoints:
>
>    - Flink processes checkpoints by sending checkpoint markers through
>    the job graph, beginning with source operators towards the sink operators
>    - These checkpoint markers are sort of a meta event that is sent along
>    you custom events (much like watermarks and latency markers)
>    - These checkpoint markers cannot pass by (i.e. go faster than) your
>    custom events
>    - In your situation, because it happen right after you start the job,
>
>
>    - it might be a source that forwards many events (e.g. for
>       backfilling) while a later operator cannot process these events in the same
>       speed
>       - therefore the events queue in front of that operator as well as
>       the checkpoint markers which consequently have a hard time to align event
>       for longer than the checkpoint timeout
>
>
>    - how to fix this situation:
>
>
>    - diagnostics: Flink dashboard has a tab for checkpoints that show how
>       long checkpoint progress and alignment take for each task/subtask
>       - which version of Flink are you using?
>       - Depending on the version of Flink you can enable unaligned
>       checkpoints (having some other implications)
>       - You could also increase scale out factor for the backfill phase
>       and then lower it again …
>
>
>
>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>    threshold: this depends on what recovery strategy you have configured …
>
>
>
> I might be mistaken, however this is what I look into when I run into
> similar situations
>
>
>
>
>
> Feel free to get back to the mailing list for further clarifications …
>
>
>
> Thias
>
>
>
>
>
> *From:* Caizhi Weng <ts...@gmail.com>
> *Sent:* Donnerstag, 2. September 2021 04:24
> *To:* Daniel Vol <vo...@gmail.com>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Flink restarts on Checkpoint failure
>
>
>
> Hi!
>
>
>
> There are a ton of possible reasons for a checkpoint failure. The most
> possible reasons might be
>
> * The JVM is busy with garbage collecting when performing the checkpoints.
> This can be checked by looking into the GC logs of a task manager.
>
> * The state suddenly becomes quite large due to some specific data
> pattern. This can be checked by looking at the state size for the completed
> portion of that checkpoint.
>
>
>
> You might also want to profile the CPU usage when the checkpoint is
> happening.
>
>
>
> Daniel Vol <vo...@gmail.com> 于2021年9月1日周三 下午7:08写道:
>
> Hello,
>
> I see the following error in my jobmanager log (Flink on EMR):
> Checking cluster logs I see :
> 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
> - close closed:false s3://***/_metadata
> 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
> 3496 ms).
> 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
> - close closed:false s3://***/_metadata
> 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
> 15856 ms).
> 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
> of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
> 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
> INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
> a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
> RUNNING to RESTARTING.
>
>
>
> Configuration is:
>
> -yD "execution.checkpointing.timeout=10 min"\
> -yD "restart-strategy=failure-rate"\
> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
> -yD "restart-strategy.failure-rate.delay=1 min"\
> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>
> Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.
>
> I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?
>
> Thanks!
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Flink restarts on Checkpoint failure

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Daniel,

I was not able to follow on … back from vacation now 😊

… just as a wrap up, you found a workable solution:

  *   The checkpoint page shows, that those checkpoints that failed actually ran into the checkpoint timeout (10m) due to backpressure preventing checkpoint mark progress
  *   As suggested, enabling unaligned checkpoints prevents this
  *   However be aware that unaligned checkpoints imply slight changes in semantics (if I remember right this is mentioned with the documentation for unaligned checkpoints)

… as to the data skew or further improvements:

  *   Your data skew does not look too bad, it actually depends on the distribution of your keys and how much entropy they encounter
  *   You probably can better tell about the distribution …
  *   If you actually know the distribution and have a way to predict the likelihood of single keys you could evenly distribute them by means of a custom partitioner (depending on the statistical properties of the key space)
  *   Before going into this I would try to improve the backlog situation (making the bottleneck operator faster, if possible)
  *   Backlog would be expected in backfill phase but after backfill cost you latency

Thias

From: Daniel Vol <vo...@gmail.com>
Sent: Montag, 13. September 2021 14:53
To: Caizhi Weng <ts...@gmail.com>
Cc: Schwalbe Matthias <Ma...@viseca.ch>; user <us...@flink.apache.org>
Subject: Re: Flink restarts on Checkpoint failure

You are right.
I see very big difference - one of subtasks created almost 2M records where some are 0:
[cid:image001.png@01D7A949.9AE95A50]\
[cid:image003.png@01D7A949.9AE95A50]
as they are sorted - last 120! subtasks so nothing...

On Mon, Sep 13, 2021 at 3:43 PM Caizhi Weng <ts...@gmail.com>> wrote:
Hi!

I see execution.checkpointing.unaligned is working for you. This indicates that checkpoints are very likely to be backpressured.

To see if there is really a data skew, you can go into Flink's web UI, click on a node and see the number of records going into and out of a subtask. If there is a subtask which handles much more data than others then there is a data skew.

Daniel Vol <vo...@gmail.com>> 于2021年9月13日周一 下午8:39写道:
Thanks!

Yes, I already found this parameter (execution.checkpointing.tolerable-failed-checkpoints) as well added:

-yD "execution.checkpointing.unaligned=true"\
-yD "execution.checkpointing.max-concurrent-checkpoints=1"\

Since then a process running much better and till now 2.5h and completed 13 checkpoints with no failures.

I will give it a few more hours to run to see the behaviour.

Regarding data skew - it is possible that we have some keys larger than others and get a pretty big skew.

But I don't saw backpressure - though I might miss something.

Daniel.

On Mon, Sep 13, 2021 at 3:23 PM Caizhi Weng <ts...@gmail.com>> wrote:
Hi!

You can set execution.checkpointing.tolerable-failed-checkpoints to tolerate a certain number of checkpoint failures. See [1].

The GC metrics are increasing because they are the total number of GCs happen so far. So it doesn't seem to be relevant.

From your image all but one subtask have finished their checkpoint. So it is likely that there is a data skew and it backpressures the checkpoint. It seems that there is a window aggregation in your job. Is there any data skew on the aggregation keys?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints

Daniel Vol <vo...@gmail.com>> 于2021年9月13日周一 下午6:00写道:
Hey,

Sorry it took some time to answer.
First - Thanks for your input.

  *   FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold: this depends on what recovery strategy you have configured …
Flink Configuration:

-yD "env.java.opts.taskmanager=-XX:+UseG1GC"\
-yD "env.java.opts.jobmanager=-XX:+UseG1GC"\
-yD "state.backend=rocksdb"\
-yD "state.checkpoints.dir=s3://..."\
-yD "state.checkpoints.num-retained=2"\
-yD "state.savepoints.dir=s3://..."\
-yD "state.backend.incremental=true"\
-yD "state.backend.local-recovery=true"\
-yD "state.backend.async=true"\
-yD "metrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter"\
-yD "taskmanager.memory.jvm-metaspace.size=1 gb"\
-yD "flink.partition-discovery.interval-millis=60000"\
-yD "pipeline.time-characteristic=EventTime"\
-yD "pipeline.generic-types=true"\
-yD "execution.checkpointing.interval=10 min"\
-yD "execution.checkpointing.min-pause=5 min"\
-yD "execution.checkpointing.mode=AT_LEAST_ONCE"\
-yD "execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION"\
-yD "execution.checkpointing.timeout=10 min"\
-yD "restart-strategy=failure-rate"\
-yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
-yD "restart-strategy.failure-rate.delay=1 min"\
-yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
-yD "akka.ask.timeout=1 min"\
-yD "yarn.application-attempts=1"\
-yD "heartbeat.timeout=120000"\

Yarn configuration:

[cid:image005.png@01D7A949.9AE95A50]

According to flink config (70 failures during 1 hour if I understand it correctly) each CheckPoint failure should not seem to cause restart on application...

but it is:

[cid:image007.png@01D7A949.9AE95A50]


EMR - 17 R5.4XL machines with flink yjm=120g ytm=120g

Checkpoint failure by itself does not really bother me - but restarting the flink due to each CheckPoint failure - very!

I have tried to move to Flink 1.12.1 (next EMR version - 5.33) from Flink 1.11.2 (EMR - 5.32) but have the same issue.

Regarding Cluster usage (EMR 5.33 with Flink 1.12.1):

I see a CPU peak on TM at time of failure and restart (JM is ~0% usage all the time)-

[cid:image009.png@01D7A949.9AE95A50]
while GC looks like increasing all the time

[cid:image011.png@01D7A949.9AE95A50]

All those metrics come from Flink itself -> Prometheus -> Grafana.
Flink dashboard 1.12.1:
[cid:image013.png@01D7A949.9AE95A50]

[cid:image015.png@01D7A949.9AE95A50]
[cid:image017.png@01D7A949.9AE95A50]
Whould be more than happy for any help to move on.

Thank you in advance,

Daniel.

On Thu, Sep 2, 2021 at 9:49 AM Schwalbe Matthias <Ma...@viseca.ch>> wrote:
Good morning Daniel,

Another reason could be backpressure with aligned checkpoints:

  *   Flink processes checkpoints by sending checkpoint markers through the job graph, beginning with source operators towards the sink operators
  *   These checkpoint markers are sort of a meta event that is sent along you custom events (much like watermarks and latency markers)
  *   These checkpoint markers cannot pass by (i.e. go faster than) your custom events
  *   In your situation, because it happen right after you start the job,

     *   it might be a source that forwards many events (e.g. for backfilling) while a later operator cannot process these events in the same speed
     *   therefore the events queue in front of that operator as well as the checkpoint markers which consequently have a hard time to align event for longer than the checkpoint timeout

  *   how to fix this situation:

     *   diagnostics: Flink dashboard has a tab for checkpoints that show how long checkpoint progress and alignment take for each task/subtask
     *   which version of Flink are you using?
     *   Depending on the version of Flink you can enable unaligned checkpoints (having some other implications)
     *   You could also increase scale out factor for the backfill phase and then lower it again …


  *   FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold: this depends on what recovery strategy you have configured …

I might be mistaken, however this is what I look into when I run into similar situations


Feel free to get back to the mailing list for further clarifications …

Thias


From: Caizhi Weng <ts...@gmail.com>>
Sent: Donnerstag, 2. September 2021 04:24
To: Daniel Vol <vo...@gmail.com>>
Cc: user <us...@flink.apache.org>>
Subject: Re: Flink restarts on Checkpoint failure

Hi!

There are a ton of possible reasons for a checkpoint failure. The most possible reasons might be
* The JVM is busy with garbage collecting when performing the checkpoints. This can be checked by looking into the GC logs of a task manager.
* The state suddenly becomes quite large due to some specific data pattern. This can be checked by looking at the state size for the completed portion of that checkpoint.

You might also want to profile the CPU usage when the checkpoint is happening.

Daniel Vol <vo...@gmail.com>> 于2021年9月1日周三 下午7:08写道:
Hello,

I see the following error in my jobmanager log (Flink on EMR):
Checking cluster logs I see :
2021-08-21 17:17:30,489 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO  com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close closed:false s3://***/_metadata
2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in 3496 ms).
2021-08-21 17:27:30,474 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO  com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close closed:false s3://***/_metadata
2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in 15856 ms).
2021-08-21 17:37:30,468 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:47:30,469 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3 of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34] INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state RUNNING to RESTARTING.

Configuration is:

-yD "execution.checkpointing.timeout=10 min"\
-yD "restart-strategy=failure-rate"\
-yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
-yD "restart-strategy.failure-rate.delay=1 min"\
-yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\

Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.

I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?

Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Flink restarts on Checkpoint failure

Posted by Daniel Vol <vo...@gmail.com>.
You are right.
I see very big difference - one of subtasks created almost 2M records where
some are 0:
[image: image.png]\
[image: image.png]
as they are sorted - last 120! subtasks so nothing...

On Mon, Sep 13, 2021 at 3:43 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> I see execution.checkpointing.unaligned is working for you. This indicates
> that checkpoints are very likely to be backpressured.
>
> To see if there is really a data skew, you can go into Flink's web UI,
> click on a node and see the number of records going into and out of a
> subtask. If there is a subtask which handles much more data than others
> then there is a data skew.
>
> Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午8:39写道:
>
>> Thanks!
>>
>> Yes, I already found this parameter
>> (execution.checkpointing.tolerable-failed-checkpoints) as well added:
>>
>> -yD "execution.checkpointing.unaligned=true"\
>> -yD "execution.checkpointing.max-concurrent-checkpoints=1"\
>>
>> Since then a process running much better and till now 2.5h and completed 13 checkpoints with no failures.
>>
>> I will give it a few more hours to run to see the behaviour.
>>
>> Regarding data skew - it is possible that we have some keys larger than others and get a pretty big skew.
>>
>> But I don't saw backpressure - though I might miss something.
>>
>> Daniel.
>>
>>
>> On Mon, Sep 13, 2021 at 3:23 PM Caizhi Weng <ts...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> You can set execution.checkpointing.tolerable-failed-checkpoints to
>>> tolerate a certain number of checkpoint failures. See [1].
>>>
>>> The GC metrics are increasing because they are the total number of GCs
>>> happen so far. So it doesn't seem to be relevant.
>>>
>>> From your image all but one subtask have finished their checkpoint. So
>>> it is likely that there is a data skew and it backpressures the checkpoint.
>>> It seems that there is a window aggregation in your job. Is there any data
>>> skew on the aggregation keys?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints
>>>
>>> Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午6:00写道:
>>>
>>>> Hey,
>>>>
>>>> Sorry it took some time to answer.
>>>> First - Thanks for your input.
>>>>
>>>>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>>>>    threshold: this depends on what recovery strategy you have configured …
>>>>
>>>> Flink Configuration:
>>>>
>>>> -yD "env.java.opts.taskmanager=-XX:+UseG1GC"\
>>>> -yD "env.java.opts.jobmanager=-XX:+UseG1GC"\
>>>> -yD "state.backend=rocksdb"\
>>>> -yD "state.checkpoints.dir=s3://..."\
>>>> -yD "state.checkpoints.num-retained=2"\
>>>> -yD "state.savepoints.dir=s3://..."\
>>>> -yD "state.backend.incremental=true"\
>>>> -yD "state.backend.local-recovery=true"\
>>>> -yD "state.backend.async=true"\
>>>> -yD "metrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter"\
>>>> -yD "taskmanager.memory.jvm-metaspace.size=1 gb"\
>>>> -yD "flink.partition-discovery.interval-millis=60000"\
>>>> -yD "pipeline.time-characteristic=EventTime"\
>>>> -yD "pipeline.generic-types=true"\
>>>> -yD "execution.checkpointing.interval=10 min"\
>>>> -yD "execution.checkpointing.min-pause=5 min"\
>>>> -yD "execution.checkpointing.mode=AT_LEAST_ONCE"\
>>>> -yD "execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION"\
>>>> -yD "execution.checkpointing.timeout=10 min"\
>>>> -yD "restart-strategy=failure-rate"\
>>>> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
>>>> -yD "restart-strategy.failure-rate.delay=1 min"\
>>>> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>>>> -yD "akka.ask.timeout=1 min"\
>>>> -yD "yarn.application-attempts=1"\
>>>> -yD "heartbeat.timeout=120000"\
>>>>
>>>> Yarn configuration:
>>>>
>>>> [image: image.png]
>>>>
>>>> According to flink config (70 failures during 1 hour if I understand it correctly) each CheckPoint failure should not seem to cause restart on application...
>>>>
>>>> but it is:[image: image.png]
>>>>
>>>> EMR - 17 R5.4XL machines with flink yjm=120g ytm=120g
>>>>
>>>> Checkpoint failure by itself does not really bother me - but restarting the flink due to *each* CheckPoint failure - very!
>>>>
>>>> I have tried to move to Flink 1.12.1 (next EMR version - 5.33) from Flink 1.11.2 (EMR - 5.32) but have the same issue.
>>>>
>>>> Regarding Cluster usage (EMR 5.33 with Flink 1.12.1):
>>>>
>>>> I see a CPU peak on TM at time of failure and restart (JM is ~0% usage all the time)-[image: image.png]
>>>> while GC looks like increasing all the time
>>>>
>>>> [image: image.png]
>>>>
>>>> All those metrics come from Flink itself -> Prometheus -> Grafana.
>>>>
>>>> Flink dashboard 1.12.1:
>>>> [image: image.png]
>>>>
>>>> [image: image.png]
>>>> [image: image.png]
>>>> Whould be more than happy for any help to move on.
>>>>
>>>> Thank you in advance,
>>>>
>>>> Daniel.
>>>>
>>>> On Thu, Sep 2, 2021 at 9:49 AM Schwalbe Matthias <
>>>> Matthias.Schwalbe@viseca.ch> wrote:
>>>>
>>>>> Good morning Daniel,
>>>>>
>>>>>
>>>>>
>>>>> Another reason could be backpressure with aligned checkpoints:
>>>>>
>>>>>    - Flink processes checkpoints by sending checkpoint markers
>>>>>    through the job graph, beginning with source operators towards the sink
>>>>>    operators
>>>>>    - These checkpoint markers are sort of a meta event that is sent
>>>>>    along you custom events (much like watermarks and latency markers)
>>>>>    - These checkpoint markers cannot pass by (i.e. go faster than)
>>>>>    your custom events
>>>>>    - In your situation, because it happen right after you start the
>>>>>    job,
>>>>>       - it might be a source that forwards many events (e.g. for
>>>>>       backfilling) while a later operator cannot process these events in the same
>>>>>       speed
>>>>>       - therefore the events queue in front of that operator as well
>>>>>       as the checkpoint markers which consequently have a hard time to align
>>>>>       event for longer than the checkpoint timeout
>>>>>    - how to fix this situation:
>>>>>       - diagnostics: Flink dashboard has a tab for checkpoints that
>>>>>       show how long checkpoint progress and alignment take for each task/subtask
>>>>>       - which version of Flink are you using?
>>>>>       - Depending on the version of Flink you can enable unaligned
>>>>>       checkpoints (having some other implications)
>>>>>       - You could also increase scale out factor for the backfill
>>>>>       phase and then lower it again …
>>>>>
>>>>>
>>>>>
>>>>>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>>>>>    threshold: this depends on what recovery strategy you have configured …
>>>>>
>>>>>
>>>>>
>>>>> I might be mistaken, however this is what I look into when I run into
>>>>> similar situations
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Feel free to get back to the mailing list for further clarifications …
>>>>>
>>>>>
>>>>>
>>>>> Thias
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *From:* Caizhi Weng <ts...@gmail.com>
>>>>> *Sent:* Donnerstag, 2. September 2021 04:24
>>>>> *To:* Daniel Vol <vo...@gmail.com>
>>>>> *Cc:* user <us...@flink.apache.org>
>>>>> *Subject:* Re: Flink restarts on Checkpoint failure
>>>>>
>>>>>
>>>>>
>>>>> Hi!
>>>>>
>>>>>
>>>>>
>>>>> There are a ton of possible reasons for a checkpoint failure. The most
>>>>> possible reasons might be
>>>>>
>>>>> * The JVM is busy with garbage collecting when performing the
>>>>> checkpoints. This can be checked by looking into the GC logs of a task
>>>>> manager.
>>>>>
>>>>> * The state suddenly becomes quite large due to some specific data
>>>>> pattern. This can be checked by looking at the state size for the completed
>>>>> portion of that checkpoint.
>>>>>
>>>>>
>>>>>
>>>>> You might also want to profile the CPU usage when the checkpoint is
>>>>> happening.
>>>>>
>>>>>
>>>>>
>>>>> Daniel Vol <vo...@gmail.com> 于2021年9月1日周三 下午7:08写道:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I see the following error in my jobmanager log (Flink on EMR):
>>>>> Checking cluster logs I see :
>>>>> 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>>>> checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
>>>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>>>> 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO
>>>>> com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close
>>>>> closed:false s3://***/_metadata
>>>>> 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>>>>> checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
>>>>> 3496 ms).
>>>>> 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>>>> checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
>>>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>>>> 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO
>>>>> com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close
>>>>> closed:false s3://***/_metadata
>>>>> 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>>>>> checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
>>>>> 15856 ms).
>>>>> 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>>>> checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
>>>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>>>> 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
>>>>> of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
>>>>> 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
>>>>> a global failure.
>>>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>>>>> tolerable failure threshold.
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
>>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
>>>>> session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
>>>>> RUNNING to RESTARTING.
>>>>>
>>>>>
>>>>>
>>>>> Configuration is:
>>>>>
>>>>> -yD "execution.checkpointing.timeout=10 min"\
>>>>> -yD "restart-strategy=failure-rate"\
>>>>> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
>>>>> -yD "restart-strategy.failure-rate.delay=1 min"\
>>>>> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>>>>>
>>>>> Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.
>>>>>
>>>>> I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>>> dieser Informationen ist streng verboten.
>>>>>
>>>>> This message is intended only for the named recipient and may contain
>>>>> confidential or privileged information. As the confidentiality of email
>>>>> communication cannot be guaranteed, we do not accept any responsibility for
>>>>> the confidentiality and the intactness of this message. If you have
>>>>> received it in error, please advise the sender by return e-mail and delete
>>>>> this message and any attachments. Any unauthorised use or dissemination of
>>>>> this information is strictly prohibited.
>>>>>
>>>>

Re: Flink restarts on Checkpoint failure

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

I see execution.checkpointing.unaligned is working for you. This indicates
that checkpoints are very likely to be backpressured.

To see if there is really a data skew, you can go into Flink's web UI,
click on a node and see the number of records going into and out of a
subtask. If there is a subtask which handles much more data than others
then there is a data skew.

Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午8:39写道:

> Thanks!
>
> Yes, I already found this parameter
> (execution.checkpointing.tolerable-failed-checkpoints) as well added:
>
> -yD "execution.checkpointing.unaligned=true"\
> -yD "execution.checkpointing.max-concurrent-checkpoints=1"\
>
> Since then a process running much better and till now 2.5h and completed 13 checkpoints with no failures.
>
> I will give it a few more hours to run to see the behaviour.
>
> Regarding data skew - it is possible that we have some keys larger than others and get a pretty big skew.
>
> But I don't saw backpressure - though I might miss something.
>
> Daniel.
>
>
> On Mon, Sep 13, 2021 at 3:23 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> You can set execution.checkpointing.tolerable-failed-checkpoints to
>> tolerate a certain number of checkpoint failures. See [1].
>>
>> The GC metrics are increasing because they are the total number of GCs
>> happen so far. So it doesn't seem to be relevant.
>>
>> From your image all but one subtask have finished their checkpoint. So it
>> is likely that there is a data skew and it backpressures the checkpoint. It
>> seems that there is a window aggregation in your job. Is there any data
>> skew on the aggregation keys?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints
>>
>> Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午6:00写道:
>>
>>> Hey,
>>>
>>> Sorry it took some time to answer.
>>> First - Thanks for your input.
>>>
>>>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>>>    threshold: this depends on what recovery strategy you have configured …
>>>
>>> Flink Configuration:
>>>
>>> -yD "env.java.opts.taskmanager=-XX:+UseG1GC"\
>>> -yD "env.java.opts.jobmanager=-XX:+UseG1GC"\
>>> -yD "state.backend=rocksdb"\
>>> -yD "state.checkpoints.dir=s3://..."\
>>> -yD "state.checkpoints.num-retained=2"\
>>> -yD "state.savepoints.dir=s3://..."\
>>> -yD "state.backend.incremental=true"\
>>> -yD "state.backend.local-recovery=true"\
>>> -yD "state.backend.async=true"\
>>> -yD "metrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter"\
>>> -yD "taskmanager.memory.jvm-metaspace.size=1 gb"\
>>> -yD "flink.partition-discovery.interval-millis=60000"\
>>> -yD "pipeline.time-characteristic=EventTime"\
>>> -yD "pipeline.generic-types=true"\
>>> -yD "execution.checkpointing.interval=10 min"\
>>> -yD "execution.checkpointing.min-pause=5 min"\
>>> -yD "execution.checkpointing.mode=AT_LEAST_ONCE"\
>>> -yD "execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION"\
>>> -yD "execution.checkpointing.timeout=10 min"\
>>> -yD "restart-strategy=failure-rate"\
>>> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
>>> -yD "restart-strategy.failure-rate.delay=1 min"\
>>> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>>> -yD "akka.ask.timeout=1 min"\
>>> -yD "yarn.application-attempts=1"\
>>> -yD "heartbeat.timeout=120000"\
>>>
>>> Yarn configuration:
>>>
>>> [image: image.png]
>>>
>>> According to flink config (70 failures during 1 hour if I understand it correctly) each CheckPoint failure should not seem to cause restart on application...
>>>
>>> but it is:[image: image.png]
>>>
>>> EMR - 17 R5.4XL machines with flink yjm=120g ytm=120g
>>>
>>> Checkpoint failure by itself does not really bother me - but restarting the flink due to *each* CheckPoint failure - very!
>>>
>>> I have tried to move to Flink 1.12.1 (next EMR version - 5.33) from Flink 1.11.2 (EMR - 5.32) but have the same issue.
>>>
>>> Regarding Cluster usage (EMR 5.33 with Flink 1.12.1):
>>>
>>> I see a CPU peak on TM at time of failure and restart (JM is ~0% usage all the time)-[image: image.png]
>>> while GC looks like increasing all the time
>>>
>>> [image: image.png]
>>>
>>> All those metrics come from Flink itself -> Prometheus -> Grafana.
>>>
>>> Flink dashboard 1.12.1:
>>> [image: image.png]
>>>
>>> [image: image.png]
>>> [image: image.png]
>>> Whould be more than happy for any help to move on.
>>>
>>> Thank you in advance,
>>>
>>> Daniel.
>>>
>>> On Thu, Sep 2, 2021 at 9:49 AM Schwalbe Matthias <
>>> Matthias.Schwalbe@viseca.ch> wrote:
>>>
>>>> Good morning Daniel,
>>>>
>>>>
>>>>
>>>> Another reason could be backpressure with aligned checkpoints:
>>>>
>>>>    - Flink processes checkpoints by sending checkpoint markers through
>>>>    the job graph, beginning with source operators towards the sink operators
>>>>    - These checkpoint markers are sort of a meta event that is sent
>>>>    along you custom events (much like watermarks and latency markers)
>>>>    - These checkpoint markers cannot pass by (i.e. go faster than)
>>>>    your custom events
>>>>    - In your situation, because it happen right after you start the
>>>>    job,
>>>>       - it might be a source that forwards many events (e.g. for
>>>>       backfilling) while a later operator cannot process these events in the same
>>>>       speed
>>>>       - therefore the events queue in front of that operator as well
>>>>       as the checkpoint markers which consequently have a hard time to align
>>>>       event for longer than the checkpoint timeout
>>>>    - how to fix this situation:
>>>>       - diagnostics: Flink dashboard has a tab for checkpoints that
>>>>       show how long checkpoint progress and alignment take for each task/subtask
>>>>       - which version of Flink are you using?
>>>>       - Depending on the version of Flink you can enable unaligned
>>>>       checkpoints (having some other implications)
>>>>       - You could also increase scale out factor for the backfill
>>>>       phase and then lower it again …
>>>>
>>>>
>>>>
>>>>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>>>>    threshold: this depends on what recovery strategy you have configured …
>>>>
>>>>
>>>>
>>>> I might be mistaken, however this is what I look into when I run into
>>>> similar situations
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Feel free to get back to the mailing list for further clarifications …
>>>>
>>>>
>>>>
>>>> Thias
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Caizhi Weng <ts...@gmail.com>
>>>> *Sent:* Donnerstag, 2. September 2021 04:24
>>>> *To:* Daniel Vol <vo...@gmail.com>
>>>> *Cc:* user <us...@flink.apache.org>
>>>> *Subject:* Re: Flink restarts on Checkpoint failure
>>>>
>>>>
>>>>
>>>> Hi!
>>>>
>>>>
>>>>
>>>> There are a ton of possible reasons for a checkpoint failure. The most
>>>> possible reasons might be
>>>>
>>>> * The JVM is busy with garbage collecting when performing the
>>>> checkpoints. This can be checked by looking into the GC logs of a task
>>>> manager.
>>>>
>>>> * The state suddenly becomes quite large due to some specific data
>>>> pattern. This can be checked by looking at the state size for the completed
>>>> portion of that checkpoint.
>>>>
>>>>
>>>>
>>>> You might also want to profile the CPU usage when the checkpoint is
>>>> happening.
>>>>
>>>>
>>>>
>>>> Daniel Vol <vo...@gmail.com> 于2021年9月1日周三 下午7:08写道:
>>>>
>>>> Hello,
>>>>
>>>> I see the following error in my jobmanager log (Flink on EMR):
>>>> Checking cluster logs I see :
>>>> 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>>> checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
>>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>>> 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO
>>>> com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close
>>>> closed:false s3://***/_metadata
>>>> 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>>>> checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
>>>> 3496 ms).
>>>> 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>>> checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
>>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>>> 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO
>>>> com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close
>>>> closed:false s3://***/_metadata
>>>> 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>>>> checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
>>>> 15856 ms).
>>>> 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>>> checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
>>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>>> 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
>>>> of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
>>>> 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
>>>> a global failure.
>>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>>>> tolerable failure threshold.
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
>>>> session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
>>>> RUNNING to RESTARTING.
>>>>
>>>>
>>>>
>>>> Configuration is:
>>>>
>>>> -yD "execution.checkpointing.timeout=10 min"\
>>>> -yD "restart-strategy=failure-rate"\
>>>> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
>>>> -yD "restart-strategy.failure-rate.delay=1 min"\
>>>> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>>>>
>>>> Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.
>>>>
>>>> I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?
>>>>
>>>> Thanks!
>>>>
>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>> dieser Informationen ist streng verboten.
>>>>
>>>> This message is intended only for the named recipient and may contain
>>>> confidential or privileged information. As the confidentiality of email
>>>> communication cannot be guaranteed, we do not accept any responsibility for
>>>> the confidentiality and the intactness of this message. If you have
>>>> received it in error, please advise the sender by return e-mail and delete
>>>> this message and any attachments. Any unauthorised use or dissemination of
>>>> this information is strictly prohibited.
>>>>
>>>

Re: Flink restarts on Checkpoint failure

Posted by Daniel Vol <vo...@gmail.com>.
Thanks!

Yes, I already found this parameter
(execution.checkpointing.tolerable-failed-checkpoints) as well added:

-yD "execution.checkpointing.unaligned=true"\
-yD "execution.checkpointing.max-concurrent-checkpoints=1"\

Since then a process running much better and till now 2.5h and
completed 13 checkpoints with no failures.

I will give it a few more hours to run to see the behaviour.

Regarding data skew - it is possible that we have some keys larger
than others and get a pretty big skew.

But I don't saw backpressure - though I might miss something.

Daniel.


On Mon, Sep 13, 2021 at 3:23 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> You can set execution.checkpointing.tolerable-failed-checkpoints to
> tolerate a certain number of checkpoint failures. See [1].
>
> The GC metrics are increasing because they are the total number of GCs
> happen so far. So it doesn't seem to be relevant.
>
> From your image all but one subtask have finished their checkpoint. So it
> is likely that there is a data skew and it backpressures the checkpoint. It
> seems that there is a window aggregation in your job. Is there any data
> skew on the aggregation keys?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints
>
> Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午6:00写道:
>
>> Hey,
>>
>> Sorry it took some time to answer.
>> First - Thanks for your input.
>>
>>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>>    threshold: this depends on what recovery strategy you have configured …
>>
>> Flink Configuration:
>>
>> -yD "env.java.opts.taskmanager=-XX:+UseG1GC"\
>> -yD "env.java.opts.jobmanager=-XX:+UseG1GC"\
>> -yD "state.backend=rocksdb"\
>> -yD "state.checkpoints.dir=s3://..."\
>> -yD "state.checkpoints.num-retained=2"\
>> -yD "state.savepoints.dir=s3://..."\
>> -yD "state.backend.incremental=true"\
>> -yD "state.backend.local-recovery=true"\
>> -yD "state.backend.async=true"\
>> -yD "metrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter"\
>> -yD "taskmanager.memory.jvm-metaspace.size=1 gb"\
>> -yD "flink.partition-discovery.interval-millis=60000"\
>> -yD "pipeline.time-characteristic=EventTime"\
>> -yD "pipeline.generic-types=true"\
>> -yD "execution.checkpointing.interval=10 min"\
>> -yD "execution.checkpointing.min-pause=5 min"\
>> -yD "execution.checkpointing.mode=AT_LEAST_ONCE"\
>> -yD "execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION"\
>> -yD "execution.checkpointing.timeout=10 min"\
>> -yD "restart-strategy=failure-rate"\
>> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
>> -yD "restart-strategy.failure-rate.delay=1 min"\
>> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>> -yD "akka.ask.timeout=1 min"\
>> -yD "yarn.application-attempts=1"\
>> -yD "heartbeat.timeout=120000"\
>>
>> Yarn configuration:
>>
>> [image: image.png]
>>
>> According to flink config (70 failures during 1 hour if I understand it correctly) each CheckPoint failure should not seem to cause restart on application...
>>
>> but it is:[image: image.png]
>>
>> EMR - 17 R5.4XL machines with flink yjm=120g ytm=120g
>>
>> Checkpoint failure by itself does not really bother me - but restarting the flink due to *each* CheckPoint failure - very!
>>
>> I have tried to move to Flink 1.12.1 (next EMR version - 5.33) from Flink 1.11.2 (EMR - 5.32) but have the same issue.
>>
>> Regarding Cluster usage (EMR 5.33 with Flink 1.12.1):
>>
>> I see a CPU peak on TM at time of failure and restart (JM is ~0% usage all the time)-[image: image.png]
>> while GC looks like increasing all the time
>>
>> [image: image.png]
>>
>> All those metrics come from Flink itself -> Prometheus -> Grafana.
>>
>> Flink dashboard 1.12.1:
>> [image: image.png]
>>
>> [image: image.png]
>> [image: image.png]
>> Whould be more than happy for any help to move on.
>>
>> Thank you in advance,
>>
>> Daniel.
>>
>> On Thu, Sep 2, 2021 at 9:49 AM Schwalbe Matthias <
>> Matthias.Schwalbe@viseca.ch> wrote:
>>
>>> Good morning Daniel,
>>>
>>>
>>>
>>> Another reason could be backpressure with aligned checkpoints:
>>>
>>>    - Flink processes checkpoints by sending checkpoint markers through
>>>    the job graph, beginning with source operators towards the sink operators
>>>    - These checkpoint markers are sort of a meta event that is sent
>>>    along you custom events (much like watermarks and latency markers)
>>>    - These checkpoint markers cannot pass by (i.e. go faster than) your
>>>    custom events
>>>    - In your situation, because it happen right after you start the
>>>    job,
>>>       - it might be a source that forwards many events (e.g. for
>>>       backfilling) while a later operator cannot process these events in the same
>>>       speed
>>>       - therefore the events queue in front of that operator as well as
>>>       the checkpoint markers which consequently have a hard time to align event
>>>       for longer than the checkpoint timeout
>>>    - how to fix this situation:
>>>       - diagnostics: Flink dashboard has a tab for checkpoints that
>>>       show how long checkpoint progress and alignment take for each task/subtask
>>>       - which version of Flink are you using?
>>>       - Depending on the version of Flink you can enable unaligned
>>>       checkpoints (having some other implications)
>>>       - You could also increase scale out factor for the backfill phase
>>>       and then lower it again …
>>>
>>>
>>>
>>>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>>>    threshold: this depends on what recovery strategy you have configured …
>>>
>>>
>>>
>>> I might be mistaken, however this is what I look into when I run into
>>> similar situations
>>>
>>>
>>>
>>>
>>>
>>> Feel free to get back to the mailing list for further clarifications …
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>> *From:* Caizhi Weng <ts...@gmail.com>
>>> *Sent:* Donnerstag, 2. September 2021 04:24
>>> *To:* Daniel Vol <vo...@gmail.com>
>>> *Cc:* user <us...@flink.apache.org>
>>> *Subject:* Re: Flink restarts on Checkpoint failure
>>>
>>>
>>>
>>> Hi!
>>>
>>>
>>>
>>> There are a ton of possible reasons for a checkpoint failure. The most
>>> possible reasons might be
>>>
>>> * The JVM is busy with garbage collecting when performing the
>>> checkpoints. This can be checked by looking into the GC logs of a task
>>> manager.
>>>
>>> * The state suddenly becomes quite large due to some specific data
>>> pattern. This can be checked by looking at the state size for the completed
>>> portion of that checkpoint.
>>>
>>>
>>>
>>> You might also want to profile the CPU usage when the checkpoint is
>>> happening.
>>>
>>>
>>>
>>> Daniel Vol <vo...@gmail.com> 于2021年9月1日周三 下午7:08写道:
>>>
>>> Hello,
>>>
>>> I see the following error in my jobmanager log (Flink on EMR):
>>> Checking cluster logs I see :
>>> 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>> checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>> 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
>>> - close closed:false s3://***/_metadata
>>> 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>>> checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
>>> 3496 ms).
>>> 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>> checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>> 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
>>> - close closed:false s3://***/_metadata
>>> 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>>> checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
>>> 15856 ms).
>>> 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>>> checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
>>> c513e9ebbea4ab72d80b1338896ca5c2.
>>> 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
>>> of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
>>> 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
>>> a global failure.
>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>>> tolerable failure threshold.
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
>>> session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
>>> RUNNING to RESTARTING.
>>>
>>>
>>>
>>> Configuration is:
>>>
>>> -yD "execution.checkpointing.timeout=10 min"\
>>> -yD "restart-strategy=failure-rate"\
>>> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
>>> -yD "restart-strategy.failure-rate.delay=1 min"\
>>> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>>>
>>> Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.
>>>
>>> I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?
>>>
>>> Thanks!
>>>
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>

Re: Flink restarts on Checkpoint failure

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

You can set execution.checkpointing.tolerable-failed-checkpoints to
tolerate a certain number of checkpoint failures. See [1].

The GC metrics are increasing because they are the total number of GCs
happen so far. So it doesn't seem to be relevant.

From your image all but one subtask have finished their checkpoint. So it
is likely that there is a data skew and it backpressures the checkpoint. It
seems that there is a window aggregation in your job. Is there any data
skew on the aggregation keys?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints

Daniel Vol <vo...@gmail.com> 于2021年9月13日周一 下午6:00写道:

> Hey,
>
> Sorry it took some time to answer.
> First - Thanks for your input.
>
>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>    threshold: this depends on what recovery strategy you have configured …
>
> Flink Configuration:
>
> -yD "env.java.opts.taskmanager=-XX:+UseG1GC"\
> -yD "env.java.opts.jobmanager=-XX:+UseG1GC"\
> -yD "state.backend=rocksdb"\
> -yD "state.checkpoints.dir=s3://..."\
> -yD "state.checkpoints.num-retained=2"\
> -yD "state.savepoints.dir=s3://..."\
> -yD "state.backend.incremental=true"\
> -yD "state.backend.local-recovery=true"\
> -yD "state.backend.async=true"\
> -yD "metrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter"\
> -yD "taskmanager.memory.jvm-metaspace.size=1 gb"\
> -yD "flink.partition-discovery.interval-millis=60000"\
> -yD "pipeline.time-characteristic=EventTime"\
> -yD "pipeline.generic-types=true"\
> -yD "execution.checkpointing.interval=10 min"\
> -yD "execution.checkpointing.min-pause=5 min"\
> -yD "execution.checkpointing.mode=AT_LEAST_ONCE"\
> -yD "execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION"\
> -yD "execution.checkpointing.timeout=10 min"\
> -yD "restart-strategy=failure-rate"\
> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
> -yD "restart-strategy.failure-rate.delay=1 min"\
> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
> -yD "akka.ask.timeout=1 min"\
> -yD "yarn.application-attempts=1"\
> -yD "heartbeat.timeout=120000"\
>
> Yarn configuration:
>
> [image: image.png]
>
> According to flink config (70 failures during 1 hour if I understand it correctly) each CheckPoint failure should not seem to cause restart on application...
>
> but it is:[image: image.png]
>
> EMR - 17 R5.4XL machines with flink yjm=120g ytm=120g
>
> Checkpoint failure by itself does not really bother me - but restarting the flink due to *each* CheckPoint failure - very!
>
> I have tried to move to Flink 1.12.1 (next EMR version - 5.33) from Flink 1.11.2 (EMR - 5.32) but have the same issue.
>
> Regarding Cluster usage (EMR 5.33 with Flink 1.12.1):
>
> I see a CPU peak on TM at time of failure and restart (JM is ~0% usage all the time)-[image: image.png]
> while GC looks like increasing all the time
>
> [image: image.png]
>
> All those metrics come from Flink itself -> Prometheus -> Grafana.
>
> Flink dashboard 1.12.1:
> [image: image.png]
>
> [image: image.png]
> [image: image.png]
> Whould be more than happy for any help to move on.
>
> Thank you in advance,
>
> Daniel.
>
> On Thu, Sep 2, 2021 at 9:49 AM Schwalbe Matthias <
> Matthias.Schwalbe@viseca.ch> wrote:
>
>> Good morning Daniel,
>>
>>
>>
>> Another reason could be backpressure with aligned checkpoints:
>>
>>    - Flink processes checkpoints by sending checkpoint markers through
>>    the job graph, beginning with source operators towards the sink operators
>>    - These checkpoint markers are sort of a meta event that is sent
>>    along you custom events (much like watermarks and latency markers)
>>    - These checkpoint markers cannot pass by (i.e. go faster than) your
>>    custom events
>>    - In your situation, because it happen right after you start the job,
>>       - it might be a source that forwards many events (e.g. for
>>       backfilling) while a later operator cannot process these events in the same
>>       speed
>>       - therefore the events queue in front of that operator as well as
>>       the checkpoint markers which consequently have a hard time to align event
>>       for longer than the checkpoint timeout
>>    - how to fix this situation:
>>       - diagnostics: Flink dashboard has a tab for checkpoints that show
>>       how long checkpoint progress and alignment take for each task/subtask
>>       - which version of Flink are you using?
>>       - Depending on the version of Flink you can enable unaligned
>>       checkpoints (having some other implications)
>>       - You could also increase scale out factor for the backfill phase
>>       and then lower it again …
>>
>>
>>
>>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>>    threshold: this depends on what recovery strategy you have configured …
>>
>>
>>
>> I might be mistaken, however this is what I look into when I run into
>> similar situations
>>
>>
>>
>>
>>
>> Feel free to get back to the mailing list for further clarifications …
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>> *From:* Caizhi Weng <ts...@gmail.com>
>> *Sent:* Donnerstag, 2. September 2021 04:24
>> *To:* Daniel Vol <vo...@gmail.com>
>> *Cc:* user <us...@flink.apache.org>
>> *Subject:* Re: Flink restarts on Checkpoint failure
>>
>>
>>
>> Hi!
>>
>>
>>
>> There are a ton of possible reasons for a checkpoint failure. The most
>> possible reasons might be
>>
>> * The JVM is busy with garbage collecting when performing the
>> checkpoints. This can be checked by looking into the GC logs of a task
>> manager.
>>
>> * The state suddenly becomes quite large due to some specific data
>> pattern. This can be checked by looking at the state size for the completed
>> portion of that checkpoint.
>>
>>
>>
>> You might also want to profile the CPU usage when the checkpoint is
>> happening.
>>
>>
>>
>> Daniel Vol <vo...@gmail.com> 于2021年9月1日周三 下午7:08写道:
>>
>> Hello,
>>
>> I see the following error in my jobmanager log (Flink on EMR):
>> Checking cluster logs I see :
>> 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>> checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
>> c513e9ebbea4ab72d80b1338896ca5c2.
>> 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
>> - close closed:false s3://***/_metadata
>> 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>> checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
>> 3496 ms).
>> 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>> checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
>> c513e9ebbea4ab72d80b1338896ca5c2.
>> 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
>> - close closed:false s3://***/_metadata
>> 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>> checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
>> 15856 ms).
>> 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
>> checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
>> c513e9ebbea4ab72d80b1338896ca5c2.
>> 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
>> of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
>> 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
>> a global failure.
>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>> tolerable failure threshold.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
>> session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
>> RUNNING to RESTARTING.
>>
>>
>>
>> Configuration is:
>>
>> -yD "execution.checkpointing.timeout=10 min"\
>> -yD "restart-strategy=failure-rate"\
>> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
>> -yD "restart-strategy.failure-rate.delay=1 min"\
>> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>>
>> Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.
>>
>> I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?
>>
>> Thanks!
>>
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>

Re: Flink restarts on Checkpoint failure

Posted by Daniel Vol <vo...@gmail.com>.
Hey,

Sorry it took some time to answer.
First - Thanks for your input.

   - FlinkRuntimeException: Exceeded checkpoint tolerable failure
   threshold: this depends on what recovery strategy you have configured …

Flink Configuration:

-yD "env.java.opts.taskmanager=-XX:+UseG1GC"\
-yD "env.java.opts.jobmanager=-XX:+UseG1GC"\
-yD "state.backend=rocksdb"\
-yD "state.checkpoints.dir=s3://..."\
-yD "state.checkpoints.num-retained=2"\
-yD "state.savepoints.dir=s3://..."\
-yD "state.backend.incremental=true"\
-yD "state.backend.local-recovery=true"\
-yD "state.backend.async=true"\
-yD "metrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter"\
-yD "taskmanager.memory.jvm-metaspace.size=1 gb"\
-yD "flink.partition-discovery.interval-millis=60000"\
-yD "pipeline.time-characteristic=EventTime"\
-yD "pipeline.generic-types=true"\
-yD "execution.checkpointing.interval=10 min"\
-yD "execution.checkpointing.min-pause=5 min"\
-yD "execution.checkpointing.mode=AT_LEAST_ONCE"\
-yD "execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION"\
-yD "execution.checkpointing.timeout=10 min"\
-yD "restart-strategy=failure-rate"\
-yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
-yD "restart-strategy.failure-rate.delay=1 min"\
-yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
-yD "akka.ask.timeout=1 min"\
-yD "yarn.application-attempts=1"\
-yD "heartbeat.timeout=120000"\

Yarn configuration:

[image: image.png]

According to flink config (70 failures during 1 hour if I understand
it correctly) each CheckPoint failure should not seem to cause restart
on application...

but it is:[image: image.png]

EMR - 17 R5.4XL machines with flink yjm=120g ytm=120g

Checkpoint failure by itself does not really bother me - but
restarting the flink due to *each* CheckPoint failure - very!

I have tried to move to Flink 1.12.1 (next EMR version - 5.33) from
Flink 1.11.2 (EMR - 5.32) but have the same issue.

Regarding Cluster usage (EMR 5.33 with Flink 1.12.1):

I see a CPU peak on TM at time of failure and restart (JM is ~0% usage
all the time)-[image: image.png]
while GC looks like increasing all the time

[image: image.png]

All those metrics come from Flink itself -> Prometheus -> Grafana.

Flink dashboard 1.12.1:
[image: image.png]

[image: image.png]
[image: image.png]
Whould be more than happy for any help to move on.

Thank you in advance,

Daniel.

On Thu, Sep 2, 2021 at 9:49 AM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Good morning Daniel,
>
>
>
> Another reason could be backpressure with aligned checkpoints:
>
>    - Flink processes checkpoints by sending checkpoint markers through
>    the job graph, beginning with source operators towards the sink operators
>    - These checkpoint markers are sort of a meta event that is sent along
>    you custom events (much like watermarks and latency markers)
>    - These checkpoint markers cannot pass by (i.e. go faster than) your
>    custom events
>    - In your situation, because it happen right after you start the job,
>       - it might be a source that forwards many events (e.g. for
>       backfilling) while a later operator cannot process these events in the same
>       speed
>       - therefore the events queue in front of that operator as well as
>       the checkpoint markers which consequently have a hard time to align event
>       for longer than the checkpoint timeout
>    - how to fix this situation:
>       - diagnostics: Flink dashboard has a tab for checkpoints that show
>       how long checkpoint progress and alignment take for each task/subtask
>       - which version of Flink are you using?
>       - Depending on the version of Flink you can enable unaligned
>       checkpoints (having some other implications)
>       - You could also increase scale out factor for the backfill phase
>       and then lower it again …
>
>
>
>    - FlinkRuntimeException: Exceeded checkpoint tolerable failure
>    threshold: this depends on what recovery strategy you have configured …
>
>
>
> I might be mistaken, however this is what I look into when I run into
> similar situations
>
>
>
>
>
> Feel free to get back to the mailing list for further clarifications …
>
>
>
> Thias
>
>
>
>
>
> *From:* Caizhi Weng <ts...@gmail.com>
> *Sent:* Donnerstag, 2. September 2021 04:24
> *To:* Daniel Vol <vo...@gmail.com>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Flink restarts on Checkpoint failure
>
>
>
> Hi!
>
>
>
> There are a ton of possible reasons for a checkpoint failure. The most
> possible reasons might be
>
> * The JVM is busy with garbage collecting when performing the checkpoints.
> This can be checked by looking into the GC logs of a task manager.
>
> * The state suddenly becomes quite large due to some specific data
> pattern. This can be checked by looking at the state size for the completed
> portion of that checkpoint.
>
>
>
> You might also want to profile the CPU usage when the checkpoint is
> happening.
>
>
>
> Daniel Vol <vo...@gmail.com> 于2021年9月1日周三 下午7:08写道:
>
> Hello,
>
> I see the following error in my jobmanager log (Flink on EMR):
> Checking cluster logs I see :
> 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
> - close closed:false s3://***/_metadata
> 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
> 3496 ms).
> 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
> - close closed:false s3://***/_metadata
> 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
> 15856 ms).
> 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
> of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
> 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
> INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
> a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
> RUNNING to RESTARTING.
>
>
>
> Configuration is:
>
> -yD "execution.checkpointing.timeout=10 min"\
> -yD "restart-strategy=failure-rate"\
> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
> -yD "restart-strategy.failure-rate.delay=1 min"\
> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>
> Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.
>
> I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?
>
> Thanks!
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Flink restarts on Checkpoint failure

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Good morning Daniel,

Another reason could be backpressure with aligned checkpoints:

  *   Flink processes checkpoints by sending checkpoint markers through the job graph, beginning with source operators towards the sink operators
  *   These checkpoint markers are sort of a meta event that is sent along you custom events (much like watermarks and latency markers)
  *   These checkpoint markers cannot pass by (i.e. go faster than) your custom events
  *   In your situation, because it happen right after you start the job,
     *   it might be a source that forwards many events (e.g. for backfilling) while a later operator cannot process these events in the same speed
     *   therefore the events queue in front of that operator as well as the checkpoint markers which consequently have a hard time to align event for longer than the checkpoint timeout
  *   how to fix this situation:
     *   diagnostics: Flink dashboard has a tab for checkpoints that show how long checkpoint progress and alignment take for each task/subtask
     *   which version of Flink are you using?
     *   Depending on the version of Flink you can enable unaligned checkpoints (having some other implications)
     *   You could also increase scale out factor for the backfill phase and then lower it again …


  *   FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold: this depends on what recovery strategy you have configured …

I might be mistaken, however this is what I look into when I run into similar situations


Feel free to get back to the mailing list for further clarifications …

Thias


From: Caizhi Weng <ts...@gmail.com>
Sent: Donnerstag, 2. September 2021 04:24
To: Daniel Vol <vo...@gmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Flink restarts on Checkpoint failure

Hi!

There are a ton of possible reasons for a checkpoint failure. The most possible reasons might be
* The JVM is busy with garbage collecting when performing the checkpoints. This can be checked by looking into the GC logs of a task manager.
* The state suddenly becomes quite large due to some specific data pattern. This can be checked by looking at the state size for the completed portion of that checkpoint.

You might also want to profile the CPU usage when the checkpoint is happening.

Daniel Vol <vo...@gmail.com>> 于2021年9月1日周三 下午7:08写道:
Hello,

I see the following error in my jobmanager log (Flink on EMR):
Checking cluster logs I see :
2021-08-21 17:17:30,489 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO  com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close closed:false s3://***/_metadata
2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in 3496 ms).
2021-08-21 17:27:30,474 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO  com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream  - close closed:false s3://***/_metadata
2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in 15856 ms).
2021-08-21 17:37:30,468 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:47:30,469 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3 of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34] INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state RUNNING to RESTARTING.

Configuration is:


-yD "execution.checkpointing.timeout=10 min"\
-yD "restart-strategy=failure-rate"\
-yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
-yD "restart-strategy.failure-rate.delay=1 min"\
-yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\

Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.

I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?

Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Flink restarts on Checkpoint failure

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

There are a ton of possible reasons for a checkpoint failure. The most
possible reasons might be
* The JVM is busy with garbage collecting when performing the checkpoints.
This can be checked by looking into the GC logs of a task manager.
* The state suddenly becomes quite large due to some specific data pattern.
This can be checked by looking at the state size for the completed portion
of that checkpoint.

You might also want to profile the CPU usage when the checkpoint is
happening.

Daniel Vol <vo...@gmail.com> 于2021年9月1日周三 下午7:08写道:

> Hello,
>
> I see the following error in my jobmanager log (Flink on EMR):
> Checking cluster logs I see :
> 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
> - close closed:false s3://***/_metadata
> 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in
> 3496 ms).
> 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream
> - close closed:false s3://***/_metadata
> 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
> 15856 ms).
> 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
> c513e9ebbea4ab72d80b1338896ca5c2.
> 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 3
> of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
> 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34]
> INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from
> a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34]
> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
> RUNNING to RESTARTING.
>
> Configuration is:
>
> -yD "execution.checkpointing.timeout=10 min"\
> -yD "restart-strategy=failure-rate"\
> -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
> -yD "restart-strategy.failure-rate.delay=1 min"\
> -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
>
> Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved.
>
> I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered?
>
> Thanks!
>
>