You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Tang (JIRA)" <ji...@apache.org> on 2019/06/11 07:01:01 UTC

[jira] [Assigned] (FLINK-11662) Discarded checkpoint can cause Tasks to fail

     [ https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Tang reassigned FLINK-11662:
--------------------------------

    Assignee: Yun Tang

> Discarded checkpoint can cause Tasks to fail
> --------------------------------------------
>
>                 Key: FLINK-11662
>                 URL: https://issues.apache.org/jira/browse/FLINK-11662
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.7.0, 1.8.0
>            Reporter: madong
>            Assignee: Yun Tang
>            Priority: Critical
>         Attachments: jobmanager.log, taskmanager.log
>
>
> Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as it receives the first decline message. Part of the discard operation is the deletion of the checkpointing directory. Depending on the underlying {{FileSystem}} implementation, concurrent write and read operation to files in the checkpoint directory can then fail (e.g. this is the case with HDFS). If there is still a local checkpointing operation running for some {{Task}} and belonging to the discarded checkpoint, then it can happen that the checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). Depending on the configuration of the {{CheckpointExceptionHandler}}, this can lead to a task failure and a job recovery which is caused by an already discarded checkpoint.
> {code:java}
> 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was not running
> at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3) (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 common frames omitted
> Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> ... 7 common frames omitted
> Caused by: org.apache.hadoop.ipc.RemoteException: java.io.IOException: Path doesn't exist: /.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkFilePath(FSNamesystem.java:3063)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3089)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3043)
> at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:938)
> at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:743)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1175)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1171)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1169)
> at org.apache.hadoop.ipc.Client.call(Client.java:863)
> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:227)
> at com.sun.proxy.$Proxy5.complete(Unknown Source)
> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
> at com.sun.proxy.$Proxy5.complete(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.closeFile(DFSClient.java:1208)
> at org.apache.hadoop.hdfs.DFSClient.access$5300(DFSClient.java:151)
> at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:6693)
> at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:6567)
> at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
> ... 12 common frames omitted
> {code}
> Especially for larger jobs with sources which recover state and, hence, take some time before they accept a checkpoint request, this can lead to an unstable job which can be stuck in some restart loop. 
> A workaround for this problem is to disable {{failTaskOnCheckpointError}} in the {{ExecutionConfig}} via {{ExecutionConfig#setFailTaskOnCheckpointError(false)}}. With this setting checkpoint failures won't fail the owning {{Task}}.
> In order to properly solve this problem, failing local checkpoint operations belonging to a discarded checkpoint should simply be ignored. A good solution could be to centralize the checkpoint operation failure handling in the {{CheckpointCoordinator}}. The {{CheckpointCoordinator}} knows which checkpoints are still valid and, hence, can distinguish between valid and invalid checkpoint operation failures. It would, furthermore, allow to implement more sophisticated failure handling strategies such as accepting {{n}} checkpoint failures before failing a task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)