You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ramana Uppala <ra...@capitalone.com> on 2020/06/06 14:27:05 UTC

Failed to deserialize Avro record

We are using AvroRowDeserializationSchema with Kafka Table source to deserialize the messages. Application failed with "Failed to deserialize Avro record." for different messages it seems.

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -26

Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) ~[avro-1.8.2.jar:1.8.2]

We are not sure what the serialization mechanism producer is using to publish the messages at this time. But above errors are related to https://issues.apache.org/jira/browse/FLINK-16048 ?

Any suggestions on fixing above issues ? we are using Flink 1.10

Re: Failed to deserialize Avro record

Posted by Dawid Wysakowicz <dw...@apache.org>.
Good to hear.

There is no schema that would support all ways. I would also rather
discourage such approach, as it makes it really hard to make changes to
the records schema. I would strongly recommend using schema registry for
all records.

If you still want to have a schema that would work for both you could
implement one based on both ConfluentRegistryAvroDeSerializationSchema
and AvroRowDeserialization which would check for the magic byte. If the
magic byte is present deserialize with
ConfluentRegistryAvroDeSerializationSchema. If it does not with the
AvroRowDeserialization. But again I'd rather discourage such approach.

Best,

Dawid

On 09/06/2020 14:21, Ramana Uppala wrote:
> Hi Arvid / Dawid,
>
> Yes we did small POC with custom Avro Row Deserializer which uses ConfluentRegistryAvroDeSerializationSchema and we are able to parse the message.
>
> We have Schema registry and users are given choice to produce with different serialization mechanisms. Some messages we are able to parse with "AvroRowDeserializationSchema" some we couldn't. Our understanding is that failed messages topics are produced with confluent serialization.
>
> Is there any uniform AvroRowDeserialization that works with all scenarios ?
>
> On 2020/06/09 11:03:23, Arvid Heise <ar...@ververica.com> wrote: 
>> If data is coming from Kafka, the write schema is most likely stored in a
>> Schema Registry. If so, you absolutely need to use
>> ConfluentRegistryAvroSerializationSchema of the
>> *flink-avro-confluent-registry* package.
>>
>> If you didn't opt for that most common architecture pattern, then you often
>> run into that the write schema and the supplied schema do not match. That
>> could also be the case here (but try the other serialization schema first).
>> If it still prevails, please elaborate how you manage the schema. It's also
>> helpful to see an example record and the schema if possible.
>>
>> On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> It's rather hard to help if we don't know the format in which the
>>> records are serialized. There is a significant difference if you use a
>>> schema registry or not. All schema registries known to me prepend the
>>> actual data with some kind of magic byte and an identifier of the
>>> schema. Therefore if we do not know to expect that we cannot properly
>>> deserialize the record.
>>>
>>> Nevertheless I would not say the problem has something to do with schema
>>> registry. If I understand you correctly some records can be
>>> deserialized. If they were produced with the schema registry type of
>>> serialization all would fail.
>>>
>>> What I can recommend is to try to log/identify a record that cannot be
>>> deserialized and check debug the AvroRowDeserializationSchema with it.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 06/06/2020 16:27, Ramana Uppala wrote:
>>>> We are using AvroRowDeserializationSchema with Kafka Table source to
>>> deserialize the messages. Application failed with "Failed to deserialize
>>> Avro record." for different messages it seems.
>>>> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length
>>> is negative: -26
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
>>>>       at
>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>> ~[avro-1.8.2.jar:1.8.2]
>>>> We are not sure what the serialization mechanism producer is using to
>>> publish the messages at this time. But above errors are related to
>>> https://issues.apache.org/jira/browse/FLINK-16048 ?
>>>> Any suggestions on fixing above issues ? we are using Flink 1.10
>>>
>> -- 
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>


Re: Failed to deserialize Avro record

Posted by Ramana Uppala <ra...@capitalone.com>.
Hi Arvid / Dawid,

Yes we did small POC with custom Avro Row Deserializer which uses ConfluentRegistryAvroDeSerializationSchema and we are able to parse the message.

We have Schema registry and users are given choice to produce with different serialization mechanisms. Some messages we are able to parse with "AvroRowDeserializationSchema" some we couldn't. Our understanding is that failed messages topics are produced with confluent serialization.

Is there any uniform AvroRowDeserialization that works with all scenarios ?

On 2020/06/09 11:03:23, Arvid Heise <ar...@ververica.com> wrote: 
> If data is coming from Kafka, the write schema is most likely stored in a
> Schema Registry. If so, you absolutely need to use
> ConfluentRegistryAvroSerializationSchema of the
> *flink-avro-confluent-registry* package.
> 
> If you didn't opt for that most common architecture pattern, then you often
> run into that the write schema and the supplied schema do not match. That
> could also be the case here (but try the other serialization schema first).
> If it still prevails, please elaborate how you manage the schema. It's also
> helpful to see an example record and the schema if possible.
> 
> On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz <dw...@apache.org>
> wrote:
> 
> > It's rather hard to help if we don't know the format in which the
> > records are serialized. There is a significant difference if you use a
> > schema registry or not. All schema registries known to me prepend the
> > actual data with some kind of magic byte and an identifier of the
> > schema. Therefore if we do not know to expect that we cannot properly
> > deserialize the record.
> >
> > Nevertheless I would not say the problem has something to do with schema
> > registry. If I understand you correctly some records can be
> > deserialized. If they were produced with the schema registry type of
> > serialization all would fail.
> >
> > What I can recommend is to try to log/identify a record that cannot be
> > deserialized and check debug the AvroRowDeserializationSchema with it.
> >
> > Best,
> >
> > Dawid
> >
> > On 06/06/2020 16:27, Ramana Uppala wrote:
> > > We are using AvroRowDeserializationSchema with Kafka Table source to
> > deserialize the messages. Application failed with "Failed to deserialize
> > Avro record." for different messages it seems.
> > >
> > > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length
> > is negative: -26
> > >
> > > Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
> > >       at
> > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
> > ~[avro-1.8.2.jar:1.8.2]
> > >
> > > We are not sure what the serialization mechanism producer is using to
> > publish the messages at this time. But above errors are related to
> > https://issues.apache.org/jira/browse/FLINK-16048 ?
> > >
> > > Any suggestions on fixing above issues ? we are using Flink 1.10
> >
> >
> 
> -- 
> 
> Arvid Heise | Senior Java Developer
> 
> <https://www.ververica.com/>
> 
> Follow us @VervericaData
> 
> --
> 
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
> 
> Stream Processing | Event Driven | Real Time
> 
> --
> 
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> 
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
> 

Re: Failed to deserialize Avro record

Posted by Arvid Heise <ar...@ververica.com>.
If data is coming from Kafka, the write schema is most likely stored in a
Schema Registry. If so, you absolutely need to use
ConfluentRegistryAvroSerializationSchema of the
*flink-avro-confluent-registry* package.

If you didn't opt for that most common architecture pattern, then you often
run into that the write schema and the supplied schema do not match. That
could also be the case here (but try the other serialization schema first).
If it still prevails, please elaborate how you manage the schema. It's also
helpful to see an example record and the schema if possible.

On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> It's rather hard to help if we don't know the format in which the
> records are serialized. There is a significant difference if you use a
> schema registry or not. All schema registries known to me prepend the
> actual data with some kind of magic byte and an identifier of the
> schema. Therefore if we do not know to expect that we cannot properly
> deserialize the record.
>
> Nevertheless I would not say the problem has something to do with schema
> registry. If I understand you correctly some records can be
> deserialized. If they were produced with the schema registry type of
> serialization all would fail.
>
> What I can recommend is to try to log/identify a record that cannot be
> deserialized and check debug the AvroRowDeserializationSchema with it.
>
> Best,
>
> Dawid
>
> On 06/06/2020 16:27, Ramana Uppala wrote:
> > We are using AvroRowDeserializationSchema with Kafka Table source to
> deserialize the messages. Application failed with "Failed to deserialize
> Avro record." for different messages it seems.
> >
> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length
> is negative: -26
> >
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
> >       at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
> ~[avro-1.8.2.jar:1.8.2]
> >
> > We are not sure what the serialization mechanism producer is using to
> publish the messages at this time. But above errors are related to
> https://issues.apache.org/jira/browse/FLINK-16048 ?
> >
> > Any suggestions on fixing above issues ? we are using Flink 1.10
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Failed to deserialize Avro record

Posted by Dawid Wysakowicz <dw...@apache.org>.
It's rather hard to help if we don't know the format in which the
records are serialized. There is a significant difference if you use a
schema registry or not. All schema registries known to me prepend the
actual data with some kind of magic byte and an identifier of the
schema. Therefore if we do not know to expect that we cannot properly
deserialize the record.

Nevertheless I would not say the problem has something to do with schema
registry. If I understand you correctly some records can be
deserialized. If they were produced with the schema registry type of
serialization all would fail.

What I can recommend is to try to log/identify a record that cannot be
deserialized and check debug the AvroRowDeserializationSchema with it.

Best,

Dawid

On 06/06/2020 16:27, Ramana Uppala wrote:
> We are using AvroRowDeserializationSchema with Kafka Table source to deserialize the messages. Application failed with "Failed to deserialize Avro record." for different messages it seems.
>
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -26
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
> 	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) ~[avro-1.8.2.jar:1.8.2]
>
> We are not sure what the serialization mechanism producer is using to publish the messages at this time. But above errors are related to https://issues.apache.org/jira/browse/FLINK-16048 ?
>
> Any suggestions on fixing above issues ? we are using Flink 1.10