You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gyula Fora (JIRA)" <ji...@apache.org> on 2017/05/19 13:12:04 UTC

[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

    [ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017374#comment-16017374 ] 

Gyula Fora commented on FLINK-6633:
-----------------------------------

I have tried you fix, restore now works for incremental checkpoints but fails on the first checkpoint afterwards:

org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 4.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:853)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:772)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: class org.apache.flink.runtime.state.PlaceholderStreamStateHandle
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:287)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:843)

full log:
https://gist.github.com/gyfora/cf1894158ddbc5bbba2c0cc70d69b505

> Register with shared state registry before adding to CompletedCheckpointStore
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-6633
>                 URL: https://issues.apache.org/jira/browse/FLINK-6633
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> Introducing placeholders for previously existing shared state requires a change that shared state is first registering with {{SharedStateregistry}} (thereby being consolidated) and only after that added to a {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written to stable storage. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)