You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Maximilian Michels (JIRA)" <ji...@apache.org> on 2019/06/12 09:45:00 UTC

[jira] [Commented] (FLINK-12653) Keyed state backend fails to restore during rescaling

    [ https://issues.apache.org/jira/browse/FLINK-12653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861950#comment-16861950 ] 

Maximilian Michels commented on FLINK-12653:
--------------------------------------------

Some more context, this does not fail when TimerService instances are removed from the operators. The issue seems to be rooted in the asynchronous snapshotting of timers via PriorityQueue states. This is also the reason why RocksDB does not suffer from this problem because we fall back to synchronous snapshotting in this case.

> Keyed state backend fails to restore during rescaling
> -----------------------------------------------------
>
>                 Key: FLINK-12653
>                 URL: https://issues.apache.org/jira/browse/FLINK-12653
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>         Environment: Beam 2.12.0 or any other Beam version
> Flink >= 1.6
> Heap/Filesystem state backend (RocksDB works fine)
>            Reporter: Maximilian Michels
>            Priority: Critical
>
> The Flink Runner includes a test which verifies checkpoints/savepoints work correctly with Beam on Flink. When adding additional tests for scaleup/scaledown [1], I came across a bug with restoring the keyed state backend. After a fair amount of debugging Beam code and checking any potential issues with serializers, I think this could be a Flink issue.
> Steps to reproduce: 
> 1. {{git clone https://github.com/mxm/beam}}
> 2. {{cd beam && git checkout savepoint-problem}}
> 3. {{./gradlew :runners:flink:1.6:test --tests "**.FlinkSavepointTest.testSavepointRestoreLegacy"}}
> Error:
> {noformat}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from any of the 1 provided restore options.
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
> 	... 5 more
> Caused by: java.lang.RuntimeException: Invalid namespace string: ''
> 	at org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245)
> 	at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246)
> 	at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221)
> 	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
> 	at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169)
> 	at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> 	at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> 	... 7 more
> {noformat}
> It is possible to change the {{maxParallelism}} to other values. The following lead to failure:
> {noformat}
>    options.setMaxParallelism(128); // default value
>    options.setMaxParallelism(64);
>     options.setMaxParallelism(118);
> {noformat}
> The following work fine:
> {noformat}
>     options.setMaxParallelism(110);
>     options.setMaxParallelism(63);
>     options.setMaxParallelism(24);
> {noformat}
> [1] https://github.com/apache/beam/commit/52d7291144f64eaa417862558d71a443fae3d690
> Everything works fine with RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)