You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Federico D'Ambrosio <fe...@smartlab.ws> on 2017/10/24 10:00:49 UTC

Could not initialize keyed state backend on restart from checkpoint

Hello everyone,

while trying to restart a flink job from an externalized checkpoint I'm
getting the following exception:

java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to restore keyed state
[window-contents]. For memory-backed keyed state, the previous serializer
of the keyed state must be present; the serializer could have been removed
from the classpath, or its implementation have changed and could not be
loaded. This is a temporary restriction that will be fixed in future
versions.
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:465)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more

The failing job, causing this exception, is
"latest-time"->"map_active_stream" and uses JodaDateTimeSerializer,
behaving like it follows:

// Preprocessing with Aggregation to get only the most recent event
val airtrafficEvents = streamByID
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.maxBy("airTrafficEvent").name("latest_time").uid("latest_time")

// Sinks
val activeStream = airtrafficEvents
.map(event =>
event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
.timeWindowAll(Time.seconds(10))
.apply(new
MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

This exception occurred after restarting the job from an externalized
checkpoint, after rebuilding the uber-jar because of the removal of a sink
which wasn't needed anymore, using thus --allowNonRestoredState while
restarting. I'd like to stress that the serializer has always been in the
classpath, inside the uber-jar and no change of implementation was made in
between executions.

I reproduced this behaviour by commenting in and out this sink, rebuilding
and restarting the job both from a savepoint and an externalized checkpoint.

Do you have any insight on this?

Cheers,
Federico D'Ambrosio