You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Iaroslav Zeigerman (Jira)" <ji...@apache.org> on 2021/08/24 22:57:00 UTC

[jira] [Created] (FLINK-23953) AvroSerializerSnapshot causes state migration even though the old and the new schemas are compatible

Iaroslav Zeigerman created FLINK-23953:
------------------------------------------

             Summary: AvroSerializerSnapshot causes state migration even though the old and the new schemas are compatible
                 Key: FLINK-23953
                 URL: https://issues.apache.org/jira/browse/FLINK-23953
             Project: Flink
          Issue Type: Bug
          Components: API / Type Serialization System
    Affects Versions: 1.13.2
            Reporter: Iaroslav Zeigerman


The problematic code is located [here|https://github.com/apache/flink/blob/release-1.13.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java#L194]. Returning COMPATIBLE_AFTER_MIGRATION seems completely unnecessary and causes issues during schema evolution. The COMPATIBLE_AS_IS status should be returned instead. Even the comment right above the shared snippet suggests that:
{noformat}
                    // The new serializer would be able to read data persisted with *this*
                    // serializer, therefore no migration
                    // is required.
{noformat}

This issue leads to Flink failures in a scenario when a new optional field is added to a schema. The following happens in this case:
# Records in state get deserialized successfully using the old serializer (with old schema)
# The schema changes leads to state migration due to the code path that I shared above.
# RocksDBKeyedStateBackend attempts to serialize a record with the old schema using the new schema.
# The latter operation fails for obvious reasons (incompatibility of record indexes between the old and the new schemas).

The failure occurs with a stack trace which looks something like this:

{noformat}
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB list state.
	at org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:269)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:630)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:559)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:509)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:670)
	at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
	... 18 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 10
	at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
	at org.apache.avro.generic.GenericData.getField(GenericData.java:825)
	at org.apache.avro.generic.GenericData.getField(GenericData.java:844)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:204)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
	at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
	at org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:263)
	... 28 more
{noformat}

If Flink skipped the migration in this case and just went ahead using the new schema for deserialization of old records no such issue would've occurred.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)