You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gaël Renoux <ga...@datadome.co> on 2022/05/23 09:18:09 UTC

TolerableCheckpointFailureNumber not always applying

Hello everyone,

We're having an issue on our Flink job: it restarted because it failed a
checkpoint, even though it shouldn't have. We've set the
tolerableCheckpointFailureNumber to 1 million to never have the job restart
because of this. However, the job did restart following a checkpoint
failure in a Kafka sink (stack-trace below).

I'm about to open an issue on Flink's Jira, but I thought I'd check if I'm
missing something first. Is there a known limitation somewhere? Or should
the tolerableCheckpointFailureNumber apply in that case?

The stack-trace:

Sink: result-to-kafka (1/1)#0 (0dbd01ad4663f5dd642569381694f57e) switched
> from RUNNING to FAILED with failure cause: java.io.IOException: Could not
> perform checkpoint 853 for operator Sink: result-to-kafka (1/1)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> not complete snapshot 853 for operator Sink: result-to-kafka (1/1)#0.
> Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
> ... 16 more
> Caused by:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: Expiring 1 record(s) for result-31:120003 ms has passed
> since batch creation
>

Any help is appreciated.

Gaël Renoux - Lead R&D Engineer
E - gael.renoux@datadome.co
W - www.datadome.co

Re: TolerableCheckpointFailureNumber not always applying

Posted by Gaël Renoux <ga...@datadome.co>.
I get the idea, but in our case this was a transient error: it was a
network issue, which was solved later without any change in Flink (see last
line of stack-trace). Errors in the sync phase are not always non-transient
(in our case, they are pretty much never).

To be honest, I have trouble imagining a case in Production where you'd
want the job to fail if a checkpoint fails. In a test environment, sure,
you want to crash as soon as possible if something goes wrong. But in
Production? I'd rather continue working as long as my job can work. Sure,
I'm in trouble if the job crashes and I don't have a recent checkpoint -
but then, crashing exactly when I don't have a checkpoint is the worst
thing that can happen.

So I think the safest solution (when Flink is in doubt about whether it's
transient or not) would be to assume the error is transient and apply the
tolerable failures configuration. In that case, the worst case scenario is
that your job goes on, and you have to verify your checkpoints to see if
everything is alright - which is something you should always do anyway, in
case you have a transient error that's not going away.



On Tue, May 24, 2022 at 6:04 AM Hangxiang Yu <ma...@gmail.com> wrote:

> In my opinion,  some exceptions in the async phase like timeout may happen
> related to the network, state size which will change, so maybe next time
> these failures will not occur. So the config makes sense for these.
> But this failure in the sync phase usually means the program will always
> fail and it will influence the. normal procedure, it has to be stopped.
> If you don't need to recover from the checkpoint, maybe you could disable
> it. But it's not recommended for a streaming job.
>
> Best,
> Hangxiang.
>
> On Tue, May 24, 2022 at 12:51 AM Gaël Renoux <ga...@datadome.co>
> wrote:
>
>> Got it, thank you. I misread the documentation and thought the async
>> referred to the task itself, not the process of taking a checkpoint.
>>
>> I guess there is currently no way to make a job never fail on a failed
>> checkpoint?
>>
>> Gaël Renoux - Lead R&D Engineer
>> E - gael.renoux@datadome.co
>> W - www.datadome.co
>>
>>
>>
>> On Mon, May 23, 2022 at 4:35 PM Hangxiang Yu <ma...@gmail.com> wrote:
>>
>>> Hi, Gaël Renoux.
>>> As you could see in [1], There are some descriptions about the config:
>>> "This only applies to the following failure reasons: IOException on the
>>> Job Manager, failures in the async phase on the Task Managers and
>>> checkpoint expiration due to a timeout. Failures originating from the sync
>>> phase on the Task Managers are always forcing failover of an affected task.
>>> Other types of checkpoint failures (such as checkpoint being subsumed) are
>>> being ignored."
>>>
>>> From the stack-trace, I see the exception is thrown in the sync phase.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 5:18 PM Gaël Renoux <ga...@datadome.co>
>>> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> We're having an issue on our Flink job: it restarted because it failed
>>>> a checkpoint, even though it shouldn't have. We've set the
>>>> tolerableCheckpointFailureNumber to 1 million to never have the job restart
>>>> because of this. However, the job did restart following a checkpoint
>>>> failure in a Kafka sink (stack-trace below).
>>>>
>>>> I'm about to open an issue on Flink's Jira, but I thought I'd check if
>>>> I'm missing something first. Is there a known limitation somewhere? Or
>>>> should the tolerableCheckpointFailureNumber apply in that case?
>>>>
>>>> The stack-trace:
>>>>
>>>> Sink: result-to-kafka (1/1)#0 (0dbd01ad4663f5dd642569381694f57e)
>>>>> switched from RUNNING to FAILED with failure cause: java.io.IOException:
>>>>> Could not perform checkpoint 853 for operator Sink: result-to-kafka (1/1)#0.
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>> at java.lang.Thread.run(Thread.java:750)
>>>>> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
>>>>> Could not complete snapshot 853 for operator Sink: result-to-kafka (1/1)#0.
>>>>> Failure reason: Checkpoint was declined.
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
>>>>> ... 16 more
>>>>> Caused by:
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>>>> send data to Kafka: Expiring 1 record(s) for result-31:120003 ms has passed
>>>>> since batch creation
>>>>>
>>>>
>>>> Any help is appreciated.
>>>>
>>>> Gaël Renoux - Lead R&D Engineer
>>>> E - gael.renoux@datadome.co
>>>> W - www.datadome.co
>>>>
>>>>

Re: TolerableCheckpointFailureNumber not always applying

Posted by Hangxiang Yu <ma...@gmail.com>.
In my opinion,  some exceptions in the async phase like timeout may happen
related to the network, state size which will change, so maybe next time
these failures will not occur. So the config makes sense for these.
But this failure in the sync phase usually means the program will always
fail and it will influence the. normal procedure, it has to be stopped.
If you don't need to recover from the checkpoint, maybe you could disable
it. But it's not recommended for a streaming job.

Best,
Hangxiang.

On Tue, May 24, 2022 at 12:51 AM Gaël Renoux <ga...@datadome.co>
wrote:

> Got it, thank you. I misread the documentation and thought the async
> referred to the task itself, not the process of taking a checkpoint.
>
> I guess there is currently no way to make a job never fail on a failed
> checkpoint?
>
> Gaël Renoux - Lead R&D Engineer
> E - gael.renoux@datadome.co
> W - www.datadome.co
>
>
>
> On Mon, May 23, 2022 at 4:35 PM Hangxiang Yu <ma...@gmail.com> wrote:
>
>> Hi, Gaël Renoux.
>> As you could see in [1], There are some descriptions about the config:
>> "This only applies to the following failure reasons: IOException on the
>> Job Manager, failures in the async phase on the Task Managers and
>> checkpoint expiration due to a timeout. Failures originating from the sync
>> phase on the Task Managers are always forcing failover of an affected task.
>> Other types of checkpoint failures (such as checkpoint being subsumed) are
>> being ignored."
>>
>> From the stack-trace, I see the exception is thrown in the sync phase.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 5:18 PM Gaël Renoux <ga...@datadome.co>
>> wrote:
>>
>>> Hello everyone,
>>>
>>> We're having an issue on our Flink job: it restarted because it failed a
>>> checkpoint, even though it shouldn't have. We've set the
>>> tolerableCheckpointFailureNumber to 1 million to never have the job restart
>>> because of this. However, the job did restart following a checkpoint
>>> failure in a Kafka sink (stack-trace below).
>>>
>>> I'm about to open an issue on Flink's Jira, but I thought I'd check if
>>> I'm missing something first. Is there a known limitation somewhere? Or
>>> should the tolerableCheckpointFailureNumber apply in that case?
>>>
>>> The stack-trace:
>>>
>>> Sink: result-to-kafka (1/1)#0 (0dbd01ad4663f5dd642569381694f57e)
>>>> switched from RUNNING to FAILED with failure cause: java.io.IOException:
>>>> Could not perform checkpoint 853 for operator Sink: result-to-kafka (1/1)#0.
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>> at java.lang.Thread.run(Thread.java:750)
>>>> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
>>>> Could not complete snapshot 853 for operator Sink: result-to-kafka (1/1)#0.
>>>> Failure reason: Checkpoint was declined.
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
>>>> ... 16 more
>>>> Caused by:
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>>> send data to Kafka: Expiring 1 record(s) for result-31:120003 ms has passed
>>>> since batch creation
>>>>
>>>
>>> Any help is appreciated.
>>>
>>> Gaël Renoux - Lead R&D Engineer
>>> E - gael.renoux@datadome.co
>>> W - www.datadome.co
>>>
>>>

Re: TolerableCheckpointFailureNumber not always applying

Posted by Gaël Renoux <ga...@datadome.co>.
Got it, thank you. I misread the documentation and thought the async
referred to the task itself, not the process of taking a checkpoint.

I guess there is currently no way to make a job never fail on a failed
checkpoint?

Gaël Renoux - Lead R&D Engineer
E - gael.renoux@datadome.co
W - www.datadome.co



On Mon, May 23, 2022 at 4:35 PM Hangxiang Yu <ma...@gmail.com> wrote:

> Hi, Gaël Renoux.
> As you could see in [1], There are some descriptions about the config:
> "This only applies to the following failure reasons: IOException on the
> Job Manager, failures in the async phase on the Task Managers and
> checkpoint expiration due to a timeout. Failures originating from the sync
> phase on the Task Managers are always forcing failover of an affected task.
> Other types of checkpoint failures (such as checkpoint being subsumed) are
> being ignored."
>
> From the stack-trace, I see the exception is thrown in the sync phase.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 5:18 PM Gaël Renoux <ga...@datadome.co>
> wrote:
>
>> Hello everyone,
>>
>> We're having an issue on our Flink job: it restarted because it failed a
>> checkpoint, even though it shouldn't have. We've set the
>> tolerableCheckpointFailureNumber to 1 million to never have the job restart
>> because of this. However, the job did restart following a checkpoint
>> failure in a Kafka sink (stack-trace below).
>>
>> I'm about to open an issue on Flink's Jira, but I thought I'd check if
>> I'm missing something first. Is there a known limitation somewhere? Or
>> should the tolerableCheckpointFailureNumber apply in that case?
>>
>> The stack-trace:
>>
>> Sink: result-to-kafka (1/1)#0 (0dbd01ad4663f5dd642569381694f57e) switched
>>> from RUNNING to FAILED with failure cause: java.io.IOException: Could not
>>> perform checkpoint 853 for operator Sink: result-to-kafka (1/1)#0.
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
>>> at
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>>> at
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
>>> at
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
>>> at
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>>> at
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>>> at
>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>> at java.lang.Thread.run(Thread.java:750)
>>> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
>>> Could not complete snapshot 853 for operator Sink: result-to-kafka (1/1)#0.
>>> Failure reason: Checkpoint was declined.
>>> at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
>>> ... 16 more
>>> Caused by:
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>> send data to Kafka: Expiring 1 record(s) for result-31:120003 ms has passed
>>> since batch creation
>>>
>>
>> Any help is appreciated.
>>
>> Gaël Renoux - Lead R&D Engineer
>> E - gael.renoux@datadome.co
>> W - www.datadome.co
>>
>>

Re: TolerableCheckpointFailureNumber not always applying

Posted by Hangxiang Yu <ma...@gmail.com>.
Hi, Gaël Renoux.
As you could see in [1], There are some descriptions about the config:
"This only applies to the following failure reasons: IOException on the Job
Manager, failures in the async phase on the Task Managers and checkpoint
expiration due to a timeout. Failures originating from the sync phase on
the Task Managers are always forcing failover of an affected task. Other
types of checkpoint failures (such as checkpoint being subsumed) are being
ignored."

From the stack-trace, I see the exception is thrown in the sync phase.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing

Best,
Hangxiang.

On Mon, May 23, 2022 at 5:18 PM Gaël Renoux <ga...@datadome.co> wrote:

> Hello everyone,
>
> We're having an issue on our Flink job: it restarted because it failed a
> checkpoint, even though it shouldn't have. We've set the
> tolerableCheckpointFailureNumber to 1 million to never have the job restart
> because of this. However, the job did restart following a checkpoint
> failure in a Kafka sink (stack-trace below).
>
> I'm about to open an issue on Flink's Jira, but I thought I'd check if I'm
> missing something first. Is there a known limitation somewhere? Or should
> the tolerableCheckpointFailureNumber apply in that case?
>
> The stack-trace:
>
> Sink: result-to-kafka (1/1)#0 (0dbd01ad4663f5dd642569381694f57e) switched
>> from RUNNING to FAILED with failure cause: java.io.IOException: Could not
>> perform checkpoint 853 for operator Sink: result-to-kafka (1/1)#0.
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
>> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
>> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
>> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>> at java.lang.Thread.run(Thread.java:750)
>> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
>> not complete snapshot 853 for operator Sink: result-to-kafka (1/1)#0.
>> Failure reason: Checkpoint was declined.
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
>> at
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
>> at
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
>> at
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
>> ... 16 more
>> Caused by:
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>> send data to Kafka: Expiring 1 record(s) for result-31:120003 ms has passed
>> since batch creation
>>
>
> Any help is appreciated.
>
> Gaël Renoux - Lead R&D Engineer
> E - gael.renoux@datadome.co
> W - www.datadome.co
>
>