You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by gerardg <ge...@talaia.io> on 2017/11/21 13:30:43 UTC

Missing checkpoint when restarting failed job

Hello,

We have a task that fails to restart from a checkpoint with the following
error:

java.lang.IllegalStateException: Could not initialize keyed state backend.
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException:
/home/gluster/flink/checkpoints/fac589c7248186bda2ad7b711f174973/chk-1/a069f85e-4ceb-4fba-9308-fb238f31574f
(No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49)
	at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
	at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
	at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70)
	at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1290)
	at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1477)
	at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1333)
	at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1512)
	at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:979)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
	... 6 common frames omitted

It seems that it tries to restore the job using checkpoint number 1 (which
was automatically deleted by flink), when the latest checkpoint is the 1620.
And I can actually see how it logged that it would try to restore from
checkpoint 1620:

Found 1 checkpoints in ZooKeeper. 
Trying to retrieve checkpoint 1620. 
Restoring from latest valid checkpoint: Checkpoint 1620 @ 1511267100332 for
fac589c7248186bda2ad7b711f174973.

I have incremental checkpointing enabled, but I read many times that
checkpoints do not reference themselves so I'm not sure what could be
happening.

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Missing checkpoint when restarting failed job

Posted by Gerard Garcia <ge...@talaia.io>.
I've been monitoring the task and checkpoint 1 never gets deleted. Right
now we have:

chk-1  chk-1222  chk-326  chk-329  chk-357  chk-358  chk-8945  chk-8999
chk-9525  chk-9788  chk-9789  chk-9790  chk-9791

I made the task fail and it recovered without problems so for now I would
say that the problem was with the distributed system or that somehow the
chk-1 folder got deleted by something external to flink. If I see the
problem again I will try to get more information.

Thanks,

Gerard

On Tue, Nov 21, 2017 at 4:27 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Ok, thanks for trying to reproduce this. If possible, could you also
> activate trace-level logging for class org.apache.flink.runtime.state.SharedStateRegistry?
> In case the problem occurs, this would greatly help to understand what was
> going on.
>
> > Am 21.11.2017 um 15:16 schrieb gerardg <ge...@talaia.io>:
> >
> >> where exactly did you read many times that incremental checkpoints
> cannot
> > reference files from previous
> >> checkpoints, because we would have to correct that information. In fact,
> >> this is how incremental checkpoints work.
> >
> > My fault, I read it in some other posts in the mailing list but now that
> I
> > read it carefully it meant savepoints not checkpoints.
> >
> >> Now for this case, I would consider it extremely unlikely that a
> >> checkpoint 1620 would still reference a checkpoint 1,
> >> in particular if the files for that checkpoint are already deleted,
> which
> >> should only happen if it is no longer
> >> referenced. Which version of Flink are you using and what is your
> >> distributed filesystem? Is there any way to
> >> reproduce the problem?
> >
> > We are using Flink version 1.3.2 and GlusterFS.  There are usually a few
> > checkpoints around at the same time, for example right now:
> >
> > chk-1  chk-26  chk-27  chk-28  chk-29  chk-30  chk-31
> >
> > I'm not sure how to reproduce the problem but I'll monitor the folder to
> see
> > when chk-1 gets deleted and try to make the task fail when that happens.
> >
> > Gerard
> >
> > Gerard
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>

Re: Missing checkpoint when restarting failed job

Posted by Stefan Richter <s....@data-artisans.com>.
Ok, thanks for trying to reproduce this. If possible, could you also activate trace-level logging for class org.apache.flink.runtime.state.SharedStateRegistry? In case the problem occurs, this would greatly help to understand what was going on.

> Am 21.11.2017 um 15:16 schrieb gerardg <ge...@talaia.io>:
> 
>> where exactly did you read many times that incremental checkpoints cannot
> reference files from previous 
>> checkpoints, because we would have to correct that information. In fact,
>> this is how incremental checkpoints work. 
> 
> My fault, I read it in some other posts in the mailing list but now that I
> read it carefully it meant savepoints not checkpoints.
> 
>> Now for this case, I would consider it extremely unlikely that a
>> checkpoint 1620 would still reference a checkpoint 1,
>> in particular if the files for that checkpoint are already deleted, which
>> should only happen if it is no longer
>> referenced. Which version of Flink are you using and what is your
>> distributed filesystem? Is there any way to
>> reproduce the problem? 
> 
> We are using Flink version 1.3.2 and GlusterFS.  There are usually a few
> checkpoints around at the same time, for example right now: 
> 
> chk-1  chk-26  chk-27  chk-28  chk-29  chk-30  chk-31
> 
> I'm not sure how to reproduce the problem but I'll monitor the folder to see
> when chk-1 gets deleted and try to make the task fail when that happens.
> 
> Gerard
> 
> Gerard
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Missing checkpoint when restarting failed job

Posted by gerardg <ge...@talaia.io>.
> where exactly did you read many times that incremental checkpoints cannot
reference files from previous 
> checkpoints, because we would have to correct that information. In fact,
> this is how incremental checkpoints work. 

My fault, I read it in some other posts in the mailing list but now that I
read it carefully it meant savepoints not checkpoints.

> Now for this case, I would consider it extremely unlikely that a
> checkpoint 1620 would still reference a checkpoint 1,
> in particular if the files for that checkpoint are already deleted, which
> should only happen if it is no longer
> referenced. Which version of Flink are you using and what is your
> distributed filesystem? Is there any way to
> reproduce the problem? 

We are using Flink version 1.3.2 and GlusterFS.  There are usually a few
checkpoints around at the same time, for example right now: 

chk-1  chk-26  chk-27  chk-28  chk-29  chk-30  chk-31

I'm not sure how to reproduce the problem but I'll monitor the folder to see
when chk-1 gets deleted and try to make the task fail when that happens.

Gerard

Gerard




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Missing checkpoint when restarting failed job

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

where exactly did you read many times that incremental checkpoints cannot reference files from previous checkpoints, because we would have to correct that information. In fact, this is how incremental checkpoints work. Now for this case, I would consider it extremely unlikely that a checkpoint 1620 would still reference a checkpoint 1, in particular if the files for that checkpoint are already deleted, which should only happen if it is no longer referenced. Which version of Flink are you using and what is your distributed filesystem? Is there any way to reproduce the problem?

Best,
Stefan

> Am 21.11.2017 um 14:30 schrieb gerardg <ge...@talaia.io>:
> 
> Hello,
> 
> We have a task that fails to restart from a checkpoint with the following
> error:
> 
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException:
> /home/gluster/flink/checkpoints/fac589c7248186bda2ad7b711f174973/chk-1/a069f85e-4ceb-4fba-9308-fb238f31574f
> (No such file or directory)
> 	at java.io.FileInputStream.open0(Native Method)
> 	at java.io.FileInputStream.open(FileInputStream.java:195)
> 	at java.io.FileInputStream.<init>(FileInputStream.java:138)
> 	at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49)
> 	at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
> 	at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
> 	at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70)
> 	at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1290)
> 	at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1477)
> 	at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1333)
> 	at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1512)
> 	at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:979)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
> 	... 6 common frames omitted
> 
> It seems that it tries to restore the job using checkpoint number 1 (which
> was automatically deleted by flink), when the latest checkpoint is the 1620.
> And I can actually see how it logged that it would try to restore from
> checkpoint 1620:
> 
> Found 1 checkpoints in ZooKeeper. 
> Trying to retrieve checkpoint 1620. 
> Restoring from latest valid checkpoint: Checkpoint 1620 @ 1511267100332 for
> fac589c7248186bda2ad7b711f174973.
> 
> I have incremental checkpointing enabled, but I read many times that
> checkpoints do not reference themselves so I'm not sure what could be
> happening.
> 
> Gerard
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/