You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Tang (Jira)" <ji...@apache.org> on 2020/12/18 15:52:00 UTC

[jira] [Updated] (FLINK-20675) Asynchronous checkpoint failure would not fail the job anymore

     [ https://issues.apache.org/jira/browse/FLINK-20675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Tang updated FLINK-20675:
-----------------------------
    Description: 
After FLINK-12364, no mater how many times of asynchronous part of checkpoint on task failed, the job itself would not fail by default:
| Default behavior|Flink-1.5 —> Flink-1.8||Flink-1.9 -> Flink-1.12||
|Synchronous part of checkpoint at task failed|Job failed|Job failed|
|Asynchronous part of checkpoint at task failed| Job failed| Job would not fail|

 This error was because {{StreamTask}} use {{Exception}} instead of {{CheckpointException}} [when async part failed|https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1118] as decline message. Thus checkpoint coordinator would call {{failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID)}} to [process the declined checkpoint|https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1316-L1323]:
{code:java}
if (cause == null) {
	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID);
} else if (cause instanceof CheckpointException) {
	CheckpointException exception = (CheckpointException) cause;
	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, exception.getCheckpointFailureReason(), cause, executionAttemptID);
} else {
	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID);
}
{code}
However, {{CheckpointFailureManager}} would [ignore the JOB_FAILURE reason|https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L108] and not count this failed checkpoint, which causes asynchronous checkpoint failure would not fail the job anymore.

 

FLINK-16753 corrects the misleading message of JOB_FAILURE but the asynchronous checkpoint failure still cannot fail the job.

 

As this bug exists too long, I decide to set it as critical instead of blocker level. 

 

  was:
After FLINK-12364, no mater how many times of asynchronous part of checkpoint on task failed, the job itself would not fail by default:
| |Flink-1.5 —> Flink-1.8||Flink-1.9 -> Flink-1.12||
|Synchronous part of checkpoint at task failed|Job failed|Job failed|
|Asynchronous part of checkpoint at task failed| Job failed| Job would not fail|

 This error was because {{StreamTask}} use {{Exception}} instead of {{CheckpointException}} [when async part failed|https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1118] as decline message. Thus checkpoint coordinator would call {{failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID)}} to [process the declined checkpoint|https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1316-L1323]:
{code:java}
if (cause == null) {
	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID);
} else if (cause instanceof CheckpointException) {
	CheckpointException exception = (CheckpointException) cause;
	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, exception.getCheckpointFailureReason(), cause, executionAttemptID);
} else {
	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID);
}
{code}
However, {{CheckpointFailureManager}} would [ignore the JOB_FAILURE reason|https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L108] and not count this failed checkpoint, which causes asynchronous checkpoint failure would not fail the job anymore.

 

FLINK-16753 corrects the misleading message of JOB_FAILURE but the asynchronous checkpoint failure still cannot fail the job.

 

As this bug exists too long, I decide to set it as critical instead of blocker level. 

 


> Asynchronous checkpoint failure would not fail the job anymore
> --------------------------------------------------------------
>
>                 Key: FLINK-20675
>                 URL: https://issues.apache.org/jira/browse/FLINK-20675
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.9.3, 1.10.2, 1.12.0, 1.11.3
>            Reporter: Yun Tang
>            Assignee: Yun Tang
>            Priority: Critical
>             Fix For: 1.13.0, 1.11.4, 1.12.1
>
>
> After FLINK-12364, no mater how many times of asynchronous part of checkpoint on task failed, the job itself would not fail by default:
> | Default behavior|Flink-1.5 —> Flink-1.8||Flink-1.9 -> Flink-1.12||
> |Synchronous part of checkpoint at task failed|Job failed|Job failed|
> |Asynchronous part of checkpoint at task failed| Job failed| Job would not fail|
>  This error was because {{StreamTask}} use {{Exception}} instead of {{CheckpointException}} [when async part failed|https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1118] as decline message. Thus checkpoint coordinator would call {{failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID)}} to [process the declined checkpoint|https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1316-L1323]:
> {code:java}
> if (cause == null) {
> 	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID);
> } else if (cause instanceof CheckpointException) {
> 	CheckpointException exception = (CheckpointException) cause;
> 	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, exception.getCheckpointFailureReason(), cause, executionAttemptID);
> } else {
> 	failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID);
> }
> {code}
> However, {{CheckpointFailureManager}} would [ignore the JOB_FAILURE reason|https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L108] and not count this failed checkpoint, which causes asynchronous checkpoint failure would not fail the job anymore.
>  
> FLINK-16753 corrects the misleading message of JOB_FAILURE but the asynchronous checkpoint failure still cannot fail the job.
>  
> As this bug exists too long, I decide to set it as critical instead of blocker level. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)