You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kenneth William Krugler (Jira)" <ji...@apache.org> on 2022/07/25 00:39:00 UTC
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570567#comment-17570567 ]
Kenneth William Krugler commented on FLINK-28653:
-------------------------------------------------
What happens if you explicitly define the no-args constructor and getters/setters for User.java, without relying on Lombok?
> State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
> -------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-28653
> URL: https://issues.apache.org/jira/browse/FLINK-28653
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System, Runtime / State Backends
> Affects Versions: 1.14.3, 1.15.0
> Environment: I ran the job on a Flink cluster I spun up using docker compose:
> ```
> version: "2.2"
> services:
> jobmanager:
> image: flink:latest
> ports:
> - "8081:8081"
> command: jobmanager
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager:
> image: flink:latest
> depends_on:
> - jobmanager
> command: taskmanager
> scale: 1
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
> ```
> My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip.
> I'm running macOS Monterey Version 12.4.
> Reporter: Peleg Tsadok
> Priority: Major
> Labels: KryoSerializer, State, avro, pojo, schema-evolution
>
> I am trying to do a POC of Flink State Schema Evolution. I am using Flink 1.15.0 and Java 11 but also tested on Flink 1.14.3.
> I tried to create 3 data classes - one for each serialization type:
> 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not supported for POJO serialization in Flink.
> 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, `Long`, `String`. The getters, setters and constructors are generated using Lombok.
> 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin.
> For each class I wrote a stream job that uses a time window to buffer elements and turn them into a list.
> For each class I tried to do the following:
> 1. Run a job
> 2. Stop with savepoint
> 3. Add a field to the data class
> 4. Submit using savepoint
> For all data classes the submit with savepoint failed with this exception:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
> at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
> at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
> at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
> Serialization trace:
> favoriteColor (io.peleg.avro.User)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
> at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
> at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
> at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
> at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
> ... 20 more
> Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
> at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
> at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
> at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
> at java.base/java.util.Objects.checkIndex(Unknown Source)
> at java.base/java.util.ArrayList.get(Unknown Source)
> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> ... 31 more{code}
>
> I expected this exception would be thrown for io.peleg.kryo.User since Flink does not support state schema evolution for Kryo serialized classes.
> But it seems to me like for all classes it ended up using the Kryo serializer instead of the POJO or Avro serializers.
> My entire code is public on GitHub [here|http://github.com/peleg68/flink-state-schema-evolution] .
> What I would like to achieve is succusfuly running a job from a savepoint of an older version of a POJO with less/more fields.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)