You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Westermann <no...@genesys.com> on 2020/09/21 13:07:07 UTC

Zookeeper connection loss causing checkpoint corruption

I recently ran into an issue with our Flink cluster: A zookeeper service deploy caused a temporary connection loss and triggered a new jobmanager leader election. Leadership election was successful and our Flink job restarted from the last checkpoint.
This checkpoint appears to have been taken while we los connection to Zookeeper and ended up in a corrupted state so the Flink job kept failing. Here’s the exception stack trace for that:
2020-09-18 01:10:57
java.lang.Exception: Exception while creating StreamOperatorStateContext.
     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
     at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from any of the 1 provided restore options.
     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
     ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)
     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
     ... 11 more
Caused by: java.io.IOException: Error while opening RocksDB instance.
     at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)
     at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)
     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)
     ... 15 more
Caused by: org.rocksdb.RocksDBException: Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst. Size recorded in manifest 5309, actual size 1199
Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst. Size recorded in manifest 654588, actual size 1541818

     at org.rocksdb.RocksDB.open(Native Method)
     at org.rocksdb.RocksDB.open(RocksDB.java:286)
     at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)
     ... 21 more

This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state backend and incremental checkpoints. I manually stopped the job and restarted it from the previous checkpoint.

This leads me to two questions:

  *   Is there a way to avoid corrupted checkpoints or is this just a case of bad timing that we have to live with?
  *   Would it be possible to automate the recovery and fall back to a previous checkpoint if a checkpoint cannot be loaded repeatedly?

Thanks,
Peter

Re: Zookeeper connection loss causing checkpoint corruption

Posted by Arpith P <ar...@gmail.com>.
I created a ticket with all my findings.
https://issues.apache.org/jira/browse/FLINK-19359.

Thanks,
Arpith

On Tue, Sep 22, 2020 at 12:16 PM Timo Walther <tw...@apache.org> wrote:

> Hi Arpith,
>
> is there a JIRA ticket for this issue already? If not, it would be great
> if you can report it. This sounds like a critical priority issue to me.
>
> Thanks,
> Timo
>
> On 22.09.20 06:25, Arpith P wrote:
> > Hi Peter,
> >
> > I have recently had a similar issue where I could not load from the
> > checkpoints path. I found that whenever a corrupt checkpoint happens the
> > "_metadata" file will not be persisted, and I've a  program which tracks
> > if checkpoint location based on this strategy and updates DB with
> > location based on timestamp. To restore the latest checkpoint I'm
> > querying DB ordered by latest timestamp. Let me know if this is helpful,
> > I can share code for this if needed.
> >
> > Arpith
> >
> > On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann
> > <no.Westermann@genesys.com <ma...@genesys.com>> wrote:
> >
> >     I recently ran into an issue with our Flink cluster: A zookeeper
> >     service deploy caused a temporary connection loss and triggered a
> >     new jobmanager leader election. Leadership election was successful
> >     and our Flink job restarted from the last checkpoint. ____
> >
> >     This checkpoint appears to have been taken while we los connection
> >     to Zookeeper and ended up in a corrupted state so the Flink job kept
> >     failing. Here’s the exception stack trace for that:____
> >
> >     2020-09-18 01:10:57____
> >
> >     java.lang.Exception: Exception while creating
> >     StreamOperatorStateContext.____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)____
> >
> >           at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)____
> >
> >           at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)____
> >
> >           at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)____
> >
> >           at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)____
> >
> >           at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)____
> >
> >           at
> >     org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)____
> >
> >           at
> >     org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)____
> >
> >           at java.lang.Thread.run(Thread.java:748)____
> >
> >     Caused by: org.apache.flink.util.FlinkException: Could not restore
> >     keyed state backend for
> >     KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from
> >     any of the 1 provided restore options.____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)____
> >
> >           ... 9 more____
> >
> >     Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> >     Caught unexpected exception.____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)____
> >
> >           at
> >
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)____
> >
> >           ... 11 more____
> >
> >     Caused by: java.io.IOException: Error while opening RocksDB
> >     instance.____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)____
> >
> >           ... 15 more____
> >
> >     Caused by: org.rocksdb.RocksDBException: Sst file size mismatch:
> >
>  /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst.
> >     Size recorded in manifest 5309, actual size 1199____
> >
> >     Sst file size mismatch:
> >
>  /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst.
> >     Size recorded in manifest 654588, actual size 1541818____
> >
> >     __ __
> >
> >           at org.rocksdb.RocksDB.open(Native Method)____
> >
> >           at org.rocksdb.RocksDB.open(RocksDB.java:286)____
> >
> >           at
> >
>  org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)____
> >
> >           ... 21 more____
> >
> >     __ __
> >
> >     This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state
> >     backend and incremental checkpoints. I manually stopped the job and
> >     restarted it from the previous checkpoint. ____
> >
> >     __ __
> >
> >     This leads me to two questions:____
> >
> >       * Is there a way to avoid corrupted checkpoints or is this just a
> >         case of bad timing that we have to live with?____
> >       * Would it be possible to automate the recovery and fall back to a
> >         previous checkpoint if a checkpoint cannot be loaded
> repeatedly?____
> >
> >     __ __
> >
> >     Thanks,____
> >
> >     Peter____
> >
>
>

Re: Zookeeper connection loss causing checkpoint corruption

Posted by Timo Walther <tw...@apache.org>.
Hi Arpith,

is there a JIRA ticket for this issue already? If not, it would be great 
if you can report it. This sounds like a critical priority issue to me.

Thanks,
Timo

On 22.09.20 06:25, Arpith P wrote:
> Hi Peter,
> 
> I have recently had a similar issue where I could not load from the 
> checkpoints path. I found that whenever a corrupt checkpoint happens the 
> "_metadata" file will not be persisted, and I've a  program which tracks 
> if checkpoint location based on this strategy and updates DB with 
> location based on timestamp. To restore the latest checkpoint I'm 
> querying DB ordered by latest timestamp. Let me know if this is helpful, 
> I can share code for this if needed.
> 
> Arpith
> 
> On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann 
> <no.Westermann@genesys.com <ma...@genesys.com>> wrote:
> 
>     I recently ran into an issue with our Flink cluster: A zookeeper
>     service deploy caused a temporary connection loss and triggered a
>     new jobmanager leader election. Leadership election was successful
>     and our Flink job restarted from the last checkpoint. ____
> 
>     This checkpoint appears to have been taken while we los connection
>     to Zookeeper and ended up in a corrupted state so the Flink job kept
>     failing. Here’s the exception stack trace for that:____
> 
>     2020-09-18 01:10:57____
> 
>     java.lang.Exception: Exception while creating
>     StreamOperatorStateContext.____
> 
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)____
> 
>           at
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)____
> 
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)____
> 
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)____
> 
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)____
> 
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)____
> 
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)____
> 
>           at
>     org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)____
> 
>           at
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)____
> 
>           at java.lang.Thread.run(Thread.java:748)____
> 
>     Caused by: org.apache.flink.util.FlinkException: Could not restore
>     keyed state backend for
>     KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from
>     any of the 1 provided restore options.____
> 
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)____
> 
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)____
> 
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)____
> 
>           ... 9 more____
> 
>     Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>     Caught unexpected exception.____
> 
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)____
> 
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)____
> 
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)____
> 
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)____
> 
>           ... 11 more____
> 
>     Caused by: java.io.IOException: Error while opening RocksDB
>     instance.____
> 
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)____
> 
>           ... 15 more____
> 
>     Caused by: org.rocksdb.RocksDBException: Sst file size mismatch:
>     /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst.
>     Size recorded in manifest 5309, actual size 1199____
> 
>     Sst file size mismatch:
>     /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst.
>     Size recorded in manifest 654588, actual size 1541818____
> 
>     __ __
> 
>           at org.rocksdb.RocksDB.open(Native Method)____
> 
>           at org.rocksdb.RocksDB.open(RocksDB.java:286)____
> 
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)____
> 
>           ... 21 more____
> 
>     __ __
> 
>     This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state
>     backend and incremental checkpoints. I manually stopped the job and
>     restarted it from the previous checkpoint. ____
> 
>     __ __
> 
>     This leads me to two questions:____
> 
>       * Is there a way to avoid corrupted checkpoints or is this just a
>         case of bad timing that we have to live with?____
>       * Would it be possible to automate the recovery and fall back to a
>         previous checkpoint if a checkpoint cannot be loaded repeatedly?____
> 
>     __ __
> 
>     Thanks,____
> 
>     Peter____
> 


Re: Zookeeper connection loss causing checkpoint corruption

Posted by Arpith P <ar...@gmail.com>.
Hi Peter,

I have recently had a similar issue where I could not load from the
checkpoints path. I found that whenever a corrupt checkpoint happens the
"_metadata" file will not be persisted, and I've a  program which tracks if
checkpoint location based on this strategy and updates DB with location
based on timestamp. To restore the latest checkpoint I'm querying DB
ordered by latest timestamp. Let me know if this is helpful, I can share
code for this if needed.

Arpith

On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann <no...@genesys.com>
wrote:

> I recently ran into an issue with our Flink cluster: A zookeeper service
> deploy caused a temporary connection loss and triggered a new jobmanager
> leader election. Leadership election was successful and our Flink job
> restarted from the last checkpoint.
>
> This checkpoint appears to have been taken while we los connection to
> Zookeeper and ended up in a corrupted state so the Flink job kept failing.
> Here’s the exception stack trace for that:
>
> 2020-09-18 01:10:57
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
>      at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>
>      at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>
>      at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
>
>      at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
>
>      at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
>      at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
>
>      at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>
>      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>
>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>
>      at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from any of
> the 1 provided restore options.
>
>      at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
>      at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>
>      at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>
>      ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>
>      at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)
>
>      at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>
>      at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>
>      at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
>      at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
>      ... 11 more
>
> Caused by: java.io.IOException: Error while opening RocksDB instance.
>
>      at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)
>
>      at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
>
>      at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)
>
>      at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
>
>      at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
>
>      at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
>
>      at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)
>
>      ... 15 more
>
> Caused by: org.rocksdb.RocksDBException: Sst file size mismatch:
> /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst.
> Size recorded in manifest 5309, actual size 1199
>
> Sst file size mismatch:
> /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst.
> Size recorded in manifest 654588, actual size 1541818
>
>
>
>      at org.rocksdb.RocksDB.open(Native Method)
>
>      at org.rocksdb.RocksDB.open(RocksDB.java:286)
>
>      at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)
>
>      ... 21 more
>
>
>
> This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state backend and
> incremental checkpoints. I manually stopped the job and restarted it from
> the previous checkpoint.
>
>
>
> This leads me to two questions:
>
>    - Is there a way to avoid corrupted checkpoints or is this just a case
>    of bad timing that we have to live with?
>    - Would it be possible to automate the recovery and fall back to a
>    previous checkpoint if a checkpoint cannot be loaded repeatedly?
>
>
>
> Thanks,
>
> Peter
>