You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shai Kaplan <Sh...@microsoft.com> on 2017/07/17 11:12:26 UTC

FileNotFoundException when restoring checkpoint

Hi.
I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. Incremental checkpoints feature is on.
The job is trying to restore a checkpoint and consistently gets:

java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
        at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
        at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)

The name of the missing file is sometimes different, but it's always a missing file in checkpoint 37. The last successful checkpoint number was 41, so I'm guessing that's the checkpoint it's trying to restore, but because of the incremental checkpointing it also needs files from previous checkpoints, which are apparently missing. Could this be a problem in the interface with Azure? If some files failed to write, why didn't the checkpoint fail?

When I realized nothing is going to change I canceled the job, and started it from a savepoint, which was checkpoint number 40. I actually expected it to fail, and that I would have to restore it from a savepoint prior to the apparently corrupted checkpoint number 37, but it didn't fail. Should I infer that savepoints are self-contained and are not incremental?

Re: FileNotFoundException when restoring checkpoint

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

If i recall correctly savepoints are always self-contained even if 
incremental checkpointing is enabled.
However, this doesn't appear to be documented anywhere.

As for the missing file, I'm looping in Stefan who is more knowledgeable 
about incremental checkpointing (and potentially know issues).

Regards,
Chesnay

On 17.07.2017 13:12, Shai Kaplan wrote:
>
> Hi.
>
> I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. 
> Incremental checkpoints feature is on.
>
> The job is trying to restore a checkpoint and consistently gets:
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.io.FileNotFoundException: 
> wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3
>
> at 
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
>
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
>
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
>
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>
> The name of the missing file is sometimes different, but it's always a 
> missing file in checkpoint 37. The last successful checkpoint number 
> was 41, so I'm guessing that's the checkpoint it's trying to restore, 
> but because of the incremental checkpointing it also needs files from 
> previous checkpoints, which are apparently missing. Could this be a 
> problem in the interface with Azure? If some files failed to write, 
> why didn't the checkpoint fail?
>
> When I realized nothing is going to change I canceled the job, and 
> started it from a savepoint, which was checkpoint number 40. I 
> actually expected it to fail, and that I would have to restore it from 
> a savepoint prior to the apparently corrupted checkpoint number 37, 
> but it didn't fail. Should I infer that savepoints are self-contained 
> and are not incremental?
>


Re: FileNotFoundException when restoring checkpoint

Posted by Stefan Richter <s....@data-artisans.com>.
After giving it a second thought, this problem could a side effect of the issue fixed in https://issues.apache.org/jira/browse/FLINK-6964 <https://issues.apache.org/jira/browse/FLINK-6964>. If you want, you can try if your problem is fixed in the latest master. This fix will also go into the 1.3.2 release branch today.

> Am 17.07.2017 um 14:37 schrieb Stefan Richter <s....@data-artisans.com>:
> 
> Hi,
> 
> You assumed correctly that savepoints are always self-contained. Are you using externalized checkpoints? There is a known problem in that was fixed in the latest master and will go into 1.3.2, but this might be a different problem.
> 
> You are also correct that incremental checkpoints can reference files from previous checkpoints. Do you ever manually delete any checkpoint directories? Because they might still be referenced in other checkpoints.
> 
> I would assume that the missing file was written completely, because otherwise the checkpoint would already fail. However I am unsure about the exact guarantees (e.g. about visibility) in Azure blobs. Can you check if this file was ever created and when it starts to disappear? Does the directory of checkpoint 37 still exist in the file system?
> 
> Best,
> Stefan
> 
>> Am 17.07.2017 um 13:12 schrieb Shai Kaplan <Shai.Kaplan@microsoft.com <ma...@microsoft.com>>:
>> 
>> Hi.
>> I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. Incremental checkpoints feature is on.
>> The job is trying to restore a checkpoint and consistently gets:
>>  
>> java.lang.IllegalStateException: Could not initialize keyed state backend.
>>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3 <wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3>
>>         at org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
>>         at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>>         at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
>>         at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>>         at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
>>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
>>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
>>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
>>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>>  
>> The name of the missing file is sometimes different, but it's always a missing file in checkpoint 37. The last successful checkpoint number was 41, so I'm guessing that's the checkpoint it's trying to restore, but because of the incremental checkpointing it also needs files from previous checkpoints, which are apparently missing. Could this be a problem in the interface with Azure? If some files failed to write, why didn't the checkpoint fail?
>>  
>> When I realized nothing is going to change I canceled the job, and started it from a savepoint, which was checkpoint number 40. I actually expected it to fail, and that I would have to restore it from a savepoint prior to the apparently corrupted checkpoint number 37, but it didn't fail. Should I infer that savepoints are self-contained and are not incremental?
> 


Re: FileNotFoundException when restoring checkpoint

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

You assumed correctly that savepoints are always self-contained. Are you using externalized checkpoints? There is a known problem in that was fixed in the latest master and will go into 1.3.2, but this might be a different problem.

You are also correct that incremental checkpoints can reference files from previous checkpoints. Do you ever manually delete any checkpoint directories? Because they might still be referenced in other checkpoints.

I would assume that the missing file was written completely, because otherwise the checkpoint would already fail. However I am unsure about the exact guarantees (e.g. about visibility) in Azure blobs. Can you check if this file was ever created and when it starts to disappear? Does the directory of checkpoint 37 still exist in the file system?

Best,
Stefan

> Am 17.07.2017 um 13:12 schrieb Shai Kaplan <Sh...@microsoft.com>:
> 
> Hi.
> I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. Incremental checkpoints feature is on.
> The job is trying to restore a checkpoint and consistently gets:
>  
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3 <wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3>
>         at org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
>         at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>         at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
>         at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>         at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>  
> The name of the missing file is sometimes different, but it's always a missing file in checkpoint 37. The last successful checkpoint number was 41, so I'm guessing that's the checkpoint it's trying to restore, but because of the incremental checkpointing it also needs files from previous checkpoints, which are apparently missing. Could this be a problem in the interface with Azure? If some files failed to write, why didn't the checkpoint fail?
>  
> When I realized nothing is going to change I canceled the job, and started it from a savepoint, which was checkpoint number 40. I actually expected it to fail, and that I would have to restore it from a savepoint prior to the apparently corrupted checkpoint number 37, but it didn't fail. Should I infer that savepoints are self-contained and are not incremental?