You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2022/02/24 12:47:00 UTC

[jira] [Created] (FLINK-26349) AvroParquetReaders does not work with ReflectData

Dawid Wysakowicz created FLINK-26349:
----------------------------------------

             Summary: AvroParquetReaders does not work with ReflectData
                 Key: FLINK-26349
                 URL: https://issues.apache.org/jira/browse/FLINK-26349
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.15.0
            Reporter: Dawid Wysakowicz
             Fix For: 1.15.0


I tried to change the {{AvroParquetFileReadITCase}} to read the data as {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact same code for writing parquet files, but changed the reading part to:

{code}
    public static final class User {
        private final String name;
        private final Integer favoriteNumber;
        private final String favoriteColor;

        public User(String name, Integer favoriteNumber, String favoriteColor) {
            this.name = name;
            this.favoriteNumber = favoriteNumber;
            this.favoriteColor = favoriteColor;
        }
    }

        final FileSource<User> source =
                FileSource.forRecordStreamFormat(
                                AvroParquetReaders.forReflectRecord(User.class),
                                Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
                        .monitorContinuously(Duration.ofMillis(5))
                        .build();
{code}

I get an error:
{code}
819020 [flink-akka.actor.default-dispatcher-9] DEBUG org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
	at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException
	at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	... 30 more
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)