You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kenneth William Krugler (Jira)" <ji...@apache.org> on 2022/07/25 00:39:00 UTC

[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords

    [ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570567#comment-17570567 ] 

Kenneth William Krugler commented on FLINK-28653:
-------------------------------------------------

What happens if you explicitly define the no-args constructor and getters/setters for User.java, without relying on Lombok?

> State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28653
>                 URL: https://issues.apache.org/jira/browse/FLINK-28653
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System, Runtime / State Backends
>    Affects Versions: 1.14.3, 1.15.0
>         Environment: I ran the job on a Flink cluster I spun up using docker compose:
> ```
> version: "2.2"
> services:
>   jobmanager:
>     image: flink:latest
>     ports:
>       - "8081:8081"
>     command: jobmanager
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>   taskmanager:
>     image: flink:latest
>     depends_on:
>       - jobmanager
>     command: taskmanager
>     scale: 1
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>         taskmanager.numberOfTaskSlots: 2
> ```
>  My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip.
> I'm running macOS Monterey Version 12.4.
>            Reporter: Peleg Tsadok
>            Priority: Major
>              Labels: KryoSerializer, State, avro, pojo, schema-evolution
>
> I am trying to do a POC of Flink State Schema Evolution. I am using Flink 1.15.0 and Java 11 but also tested on Flink 1.14.3.
> I tried to create 3 data classes - one for each serialization type:
> 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not supported for POJO serialization in Flink.
> 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, `Long`, `String`. The getters, setters and constructors are generated using Lombok.
> 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin.
> For each class I wrote a stream job that uses a time window to buffer elements and turn them into a list.
> For each class I tried to do the following:
> 1. Run a job
> 2. Stop with savepoint
> 3. Add a field to the data class
> 4. Submit using savepoint
> For all data classes the submit with savepoint failed with this exception:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>     at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options.
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>     ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
>     at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
>     at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
>     at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>     ... 13 more
> Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
> Serialization trace:
> favoriteColor (io.peleg.avro.User)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
>     at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
>     at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
>     at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
>     at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
>     ... 20 more
> Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
>     at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
>     at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
>     at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
>     at java.base/java.util.Objects.checkIndex(Unknown Source)
>     at java.base/java.util.ArrayList.get(Unknown Source)
>     at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>     ... 31 more{code}
>  
> I expected this exception would be thrown for io.peleg.kryo.User since Flink does not support state schema evolution for Kryo serialized classes.
> But it seems to me like for all classes it ended up using the Kryo serializer instead of the POJO or Avro serializers.
> My entire code is public on GitHub [here|http://github.com/peleg68/flink-state-schema-evolution] .
> What I would like to achieve is succusfuly running a job from a savepoint of an older version of a POJO with less/more fields.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)