You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shamit <ja...@gmail.com> on 2021/02/08 03:17:33 UTC

"upsert-kafka" connector not working with Avro confluent schema registry

Hello Team,

As we have two kafka connectors "upsert-kafka" and "kafka". 

I am facing issue with "upsert-kafka" while reading avro message  serialized
using "io.confluent.kafka.serializers.KafkaAvroDeserializer".  
Please note "kafka" connector is working while reading avro message 
serialized using "io.confluent.kafka.serializers.KafkaAvroDeserializer".  

Below are the definitions with both the Kafka connector:-

*Table definition with "kafka"connector which is working fine.*

/CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
'lndcdcadsprpslproposalline',
'format'='avro-confluent','avro-confluent.schema-registry.url' = '
http://localhost:8081', 'avro-confluent.schema-registry.subject' =
'lndcdcadsprpslproposalline-value') <http://localhost:8081'> /

*Table definition and error with "upsert-kafka"connector which is not
working fine.*

      /          CREATE TABLE proposalLine (PROPOSAL_LINE_ID
bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='
http://localhost:8081',
'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')

ERROR:
     Caused by: java.io.IOException: Failed to deserialize Avro record.
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
        at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
        ... 9 more <http://localhost:8081'> /


Please help.

Regards,
Shamit



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: "upsert-kafka" connector not working with Avro confluent schema registry

Posted by Shamit <ja...@gmail.com>.
Hi Arvid,

Thanks for the response.  
I have tried without serializer and getting error. 
With "avro-confluent" it shows missing "schema-registry.url" although it is
defined in the definition. Below is the screen shot.

Request you to please help.  

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2972/Screen_Shot_2021-02-11_at_4.png> 

Regards,
Shamit Jain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: "upsert-kafka" connector not working with Avro confluent schema registry

Posted by Shamit <ja...@gmail.com>.
Hi Arvid,

Thanks for the response.  
I have tried without serializer and getting error. 
With "avro-confluent" it shows missing "schema-registry.url" although it is
defined in the definition. Below is the screen shot.

Request you to please help.  

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2972/Screen_Shot_2021-02-11_at_4.png> 

Regards,
Shamit Jain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: "upsert-kafka" connector not working with Avro confluent schema registry

Posted by Arvid Heise <ar...@apache.org>.
Hi Shamit,

Why are specifying the upsert-kafka completely different? In particular,
why did you set the serializer explicitly?

I would have assumed that just setting the format to
'format'='avro-confluent' should be enough (same as in the working source).

On Tue, Feb 9, 2021 at 11:06 PM Shamit <ja...@gmail.com> wrote:

> Hello Flink Users,
>
> Request you to please help. I am facing issue with "KafkaAvroDeserializer"
> by using "upsert-kafka" connector.
>
> Regards,
> Shamit Jain
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: "upsert-kafka" connector not working with Avro confluent schema registry

Posted by Shamit <ja...@gmail.com>.
Hello Flink Users,

Request you to please help. I am facing issue with "KafkaAvroDeserializer"
by using "upsert-kafka" connector.

Regards,
Shamit Jain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/