You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda-Espinosa <al...@microfocus.com> on 2021/04/29 16:21:08 UTC

Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

Hello,

I see that new Jira bots are now active. If no one has time to look at this, could documentation at least be updated to reflect the fact that removing fields from POJOs will break state restoration?

Regards,
Alexis.

________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Friday, March 12, 2021 6:22 PM
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

Hi Alexis,

This looks like a bug, I've created a Jira ticket to address it [1].
Please feel free to provide any additional information.

In particular, whether you are able to reproduce it in any of the
subsequent releases.

[1]
https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Roman


On Thu, Mar 11, 2021 at 5:36 PM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hi everyone,
>
>
>
> It seems I’m having either the same problem, or a problem similar to the one mentioned here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html
>
>
>
> I have a POJO class that is used in Flink state. The class is annotated with @TypeInfo as described, e.g., here: https://stackoverflow.com/a/64721838/5793905
>
>
>
> Now I want to remove a field from the POJO. This removal is also considered in the corresponding TypeInfoFactory. However, after trying to restore from a savepoint where the POJO still had the field I get this exception:
>
>
>
> 2021-03-10T20:51:30.406Z INFO  org.apache.flink.runtime.taskmanager.Task:960 … (6/8) (d630d5ff0d7ae4fbc428b151abebab52) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
>
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
>
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedCoProcessOperator_c535ac415eeb524d67c88f4a481077d2_(6/8) from any of the 1 provided restore options.
>
>         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>
>         ... 6 common frames omitted
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
>
>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>
>         at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>
>         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
>         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
>         ... 8 common frames omitted
>
> Caused by: java.lang.NullPointerException: null
>
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
>
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
>
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
>
>         at org.apache.flink.api.common.typeutils.CompositeSerializer$PrecomputedParameters.precompute(CompositeSerializer.java:228)
>
>         at org.apache.flink.api.common.typeutils.CompositeSerializer.<init>(CompositeSerializer.java:51)
>
>         at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer.<init>(TtlStateFactory.java:250)
>
>         at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:359)
>
>         at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:330)
>
>         at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194)
>
>         at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>
>         at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>
>         at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>
>         at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>
>         at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
>
>         at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>
>         at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
>
>         at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
>
>         at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
>
>         at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194)
>
>         at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
>
>         at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
>
>         at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:136)
>
>         at org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(StateTable.java:315)
>
>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:54)
>
>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:36)
>
>         at org.apache.flink.runtime.state.heap.StateTable.<init>(StateTable.java:98)
>
>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.<init>(CopyOnWriteStateTable.java:49)
>
>         at org.apache.flink.runtime.state.heap.AsyncSnapshotStrategySynchronicityBehavior.newStateTable(AsyncSnapshotStrategySynchronicityBehavior.java:41)
>
>         at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.newStateTable(HeapSnapshotStrategy.java:243)
>
>         at org.apache.flink.runtime.state.heap.HeapRestoreOperation.createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185)
>
>         at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:152)
>
>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>
>         ... 12 common frames omitted
>
>
>
> I’m currently using Flink 1.9.3 to both save and restore the job’s savepoint. Even though I can avoid the problem by simply leaving the field in the class, a problem like this would prevent rollback, which is important for my application.
>
>
>
> Regards,
>
> Alexis.
>
>