You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Stone <el...@gmail.com> on 2018/10/29 18:54:15 UTC

Job fails to restore from checkpoint in Kubernetes with FileNotFoundException

I am testing Flink in a Kubernetes cluster and am finding that a job gets caught in a recovery loop.  Logs show that the issue is that a checkpoint cannot be found although checkpoints are being taken per the Flink web UI.  Any advice on how to resolve this is most appreciated.

Note on below: I can easily replicate this with a single TaskManager (>1 slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a job parallelism of 8.

Setup:
- Flink 1.6.0
- Kubernetes cluster.
- 1 JobManager node, 2 TaskManager nodes. 
- RocksDB backend with incremental checkpointing.
- There is not a persistent volume mounted on any of the three nodes.  In production, we would obviously need a persistent volume on the JobManager.
- Job submitted and running such that the job is parallelized over both nodes (i.e. each TM has 4 task slots; job parallelism = 5).

Test:
- Let the job collect a few checkpoints, say, 9 checkpoints.
- Kill one of the two TMs (kubectl delete pods <pod-name>).
- New TM pod starts.

Result:
- After the new TM starts, the job will cycle through FAILING -> RUNNING -> FAILING -> RUNNING ->...

Relevant Information
Contents of JobManager's pod:
user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28
chk-9  shared  taskowned
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9
_metadata
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
# ls -al flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
-rw-r--r-- 1 flink flink 23665 Oct 29 18:07 flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared

Stacktrace
A similar message is repeated over and over in the logs:
- "Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)"
- "Could not restore operator state backend for WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)"
- "Could not restore operator state backend for StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)"

2018-10-29 18:12:48,965 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
	... 5 more
Caused by: java.io.FileNotFoundException: /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
	at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
	at org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
	... 7 more

Re: Job fails to restore from checkpoint in Kubernetes with FileNotFoundException

Posted by Till Rohrmann <tr...@apache.org>.
As Vino pointed out, you need to configure a checkpoint directory which is
accessible from all TMs. Otherwise you won't be able to recover the state
if the task gets scheduled to a different TaskManager. Usually, people use
HDFS or S3 for that.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:50 AM vino yang <ya...@gmail.com> wrote:

> Hi John,
>
> Is the file system configured by RocksDBStateBackend HDFS?[1]
>
> Thanks, vino.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html#the-rocksdbstatebackend
>
> John Stone <el...@gmail.com> 于2018年10月30日周二 上午2:54写道:
>
>> I am testing Flink in a Kubernetes cluster and am finding that a job gets
>> caught in a recovery loop.  Logs show that the issue is that a checkpoint
>> cannot be found although checkpoints are being taken per the Flink web UI.
>> Any advice on how to resolve this is most appreciated.
>>
>> Note on below: I can easily replicate this with a single TaskManager (>1
>> slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a
>> job parallelism of 8.
>>
>> Setup:
>> - Flink 1.6.0
>> - Kubernetes cluster.
>> - 1 JobManager node, 2 TaskManager nodes.
>> - RocksDB backend with incremental checkpointing.
>> - There is not a persistent volume mounted on any of the three nodes.  In
>> production, we would obviously need a persistent volume on the JobManager.
>> - Job submitted and running such that the job is parallelized over both
>> nodes (i.e. each TM has 4 task slots; job parallelism = 5).
>>
>> Test:
>> - Let the job collect a few checkpoints, say, 9 checkpoints.
>> - Kill one of the two TMs (kubectl delete pods <pod-name>).
>> - New TM pod starts.
>>
>> Result:
>> - After the new TM starts, the job will cycle through FAILING -> RUNNING
>> -> FAILING -> RUNNING ->...
>>
>> Relevant Information
>> Contents of JobManager's pod:
>> user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh
>> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28
>> chk-9  shared  taskowned
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9
>> _metadata
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>>
>> flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> # ls -al
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> -rw-r--r-- 1 flink flink 23665 Oct 29 18:07
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared
>>
>> Stacktrace
>> A similar message is repeated over and over in the logs:
>> - "Could not restore operator state backend for
>> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)"
>> - "Could not restore operator state backend for
>> WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)"
>> - "Could not restore operator state backend for
>> StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)"
>>
>> 2018-10-29 18:12:48,965 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>> (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING.
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>         at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for
>> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from
>> any of the 1 provided restore options.
>>         at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>>         at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>>         at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>>         ... 5 more
>> Caused by: java.io.FileNotFoundException:
>> /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b
>> (No such file or directory)
>>         at java.io.FileInputStream.open0(Native Method)
>>         at java.io.FileInputStream.open(FileInputStream.java:195)
>>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>>         at
>> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>>         at
>> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
>>         at
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>>         at
>> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>>         at
>> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
>>         at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
>>         at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>>         at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>         at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>         ... 7 more
>>
>

Re: Job fails to restore from checkpoint in Kubernetes with FileNotFoundException

Posted by vino yang <ya...@gmail.com>.
Hi John,

Is the file system configured by RocksDBStateBackend HDFS?[1]

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html#the-rocksdbstatebackend

John Stone <el...@gmail.com> 于2018年10月30日周二 上午2:54写道:

> I am testing Flink in a Kubernetes cluster and am finding that a job gets
> caught in a recovery loop.  Logs show that the issue is that a checkpoint
> cannot be found although checkpoints are being taken per the Flink web UI.
> Any advice on how to resolve this is most appreciated.
>
> Note on below: I can easily replicate this with a single TaskManager (>1
> slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a
> job parallelism of 8.
>
> Setup:
> - Flink 1.6.0
> - Kubernetes cluster.
> - 1 JobManager node, 2 TaskManager nodes.
> - RocksDB backend with incremental checkpointing.
> - There is not a persistent volume mounted on any of the three nodes.  In
> production, we would obviously need a persistent volume on the JobManager.
> - Job submitted and running such that the job is parallelized over both
> nodes (i.e. each TM has 4 task slots; job parallelism = 5).
>
> Test:
> - Let the job collect a few checkpoints, say, 9 checkpoints.
> - Kill one of the two TMs (kubectl delete pods <pod-name>).
> - New TM pod starts.
>
> Result:
> - After the new TM starts, the job will cycle through FAILING -> RUNNING
> -> FAILING -> RUNNING ->...
>
> Relevant Information
> Contents of JobManager's pod:
> user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh
> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28
> chk-9  shared  taskowned
> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9
> _metadata
> # ls
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>
> flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
> # ls -al
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
> -rw-r--r-- 1 flink flink 23665 Oct 29 18:07
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
> # ls
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned
> # ls
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared
>
> Stacktrace
> A similar message is repeated over and over in the logs:
> - "Could not restore operator state backend for
> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)"
> - "Could not restore operator state backend for
> WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)"
> - "Could not restore operator state backend for
> StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)"
>
> 2018-10-29 18:12:48,965 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore
> operator state backend for
> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from
> any of the 1 provided restore options.
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>         ... 5 more
> Caused by: java.io.FileNotFoundException:
> /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b
> (No such file or directory)
>         at java.io.FileInputStream.open0(Native Method)
>         at java.io.FileInputStream.open(FileInputStream.java:195)
>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>         at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>         at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
>         at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>         at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>         at
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
>         at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
>         at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>         ... 7 more
>