You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by shamit jain <ja...@gmail.com> on 2021/02/07 05:16:24 UTC

Flink upset-kaka connector not working with Avro confluent

Hello Team,

I am facing issue with "upsert-kafka" connector which should read the Avro message serialized using "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working with "kafka" connector.

Looks like we are not able to pass the schema registry url and subject name like the way we are passing while using "kafka" connector.

Please help.


Table definition with upsert-kafka is below (not working),

                CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' = 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='<a href="http://localhost:8081'">http://localhost:8081', 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer', 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')

ERROR:
     Caused by: java.io.IOException: Failed to deserialize Avro record.
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
        ... 9 more


Table definition with kafka connector is below (working),
CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = 'lndcdcadsprpslproposalline', 'format'='avro-confluent','avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081', 'avro-confluent.schema-registry.subject' = 'lndcdcadsprpslproposalline-value')

Regards,
Shamit

Re: Flink upset-kaka connector not working with Avro confluent

Posted by Till Rohrmann <tr...@apache.org>.
Hi Shamit,

thanks for reaching out to the community. I am pulling in Timo who might
know more about this problem.

Cheers,
Till

On Sun, Feb 7, 2021 at 6:22 AM shamit jain <ja...@gmail.com> wrote:

> Hello Team,
>
> I am facing issue with "upsert-kafka" connector which should read the Avro
> message serialized using
> "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working
> with "kafka" connector.
>
> Looks like we are not able to pass the schema registry url and subject
> name like the way we are passing while using "kafka" connector.
>
> Please help.
>
>
> Table definition with upsert-kafka is below (not working),
>
>                 CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='<a
> href="http://localhost:8081'">http://localhost:8081',
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>
> ERROR:
>      Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>         at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>         at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at org.apache.avro.io
> .ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>         at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>         at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
>         ... 9 more
>
>
> Table definition with kafka connector is below (working),
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
> ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline',
> 'format'='avro-confluent','avro-confluent.schema-registry.url' = '<a href="
> http://localhost:8081'">http://localhost:8081',
> 'avro-confluent.schema-registry.subject' =
> 'lndcdcadsprpslproposalline-value')
>
> Regards,
> Shamit

Re: Flink upset-kaka connector not working with Avro confluent

Posted by Till Rohrmann <tr...@apache.org>.
Hi Shamit,

thanks for reaching out to the community. I am pulling in Timo who might
know more about this problem.

Cheers,
Till

On Sun, Feb 7, 2021 at 6:22 AM shamit jain <ja...@gmail.com> wrote:

> Hello Team,
>
> I am facing issue with "upsert-kafka" connector which should read the Avro
> message serialized using
> "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working
> with "kafka" connector.
>
> Looks like we are not able to pass the schema registry url and subject
> name like the way we are passing while using "kafka" connector.
>
> Please help.
>
>
> Table definition with upsert-kafka is below (not working),
>
>                 CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='<a
> href="http://localhost:8081'">http://localhost:8081',
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>
> ERROR:
>      Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>         at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>         at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at org.apache.avro.io
> .ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>         at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>         at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
>         ... 9 more
>
>
> Table definition with kafka connector is below (working),
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
> ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline',
> 'format'='avro-confluent','avro-confluent.schema-registry.url' = '<a href="
> http://localhost:8081'">http://localhost:8081',
> 'avro-confluent.schema-registry.subject' =
> 'lndcdcadsprpslproposalline-value')
>
> Regards,
> Shamit

Re: Flink upset-kaka connector not working with Avro confluent

Posted by Svend Vanderveken <sv...@kelesia.com>.
Hi Shamit,

In this snippet:

Table definition with upsert-kafka is below (not working),
>
>                 CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='<a
> href="http://localhost:8081'">http://localhost:8081',
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>

I believe this part of the connector configuration needs to be updated:

'key.format' = 'avro',
'value.format' = 'avro',

to this:

'key.format' = 'avro-confluent',
'value.format' = 'avro-confluent'

Also, the 'properties.key.deserializer and 'properties.value.deserializer'
should not be specified. They have no effect actually, since Flink will
override them based on the information we provide in the 'key.format' and
'value.format' entries (see here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#properties
).

In the next version of the documentation, there will be examples of how to
use the Confluent Avro format for both the key and the value, including an
example with the upsert-kafka connector. You can already preview it here:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/connectors/formats/avro-confluent.html#how-to-create-tables-with-avro-confluent-format

I hope this helps,

Cheers,

Svend

On Sun, Feb 7, 2021 at 6:22 AM shamit jain <ja...@gmail.com> wrote:

> Hello Team,
>
> I am facing issue with "upsert-kafka" connector which should read the Avro
> message serialized using
> "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working
> with "kafka" connector.
>
> Looks like we are not able to pass the schema registry url and subject
> name like the way we are passing while using "kafka" connector.
>
> Please help.
>
>
> Table definition with upsert-kafka is below (not working),
>
>                 CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='<a
> href="http://localhost:8081'">http://localhost:8081',
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>
> ERROR:
>      Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>         at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>         at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at org.apache.avro.io
> .ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>         at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>         at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
>         ... 9 more
>
>
> Table definition with kafka connector is below (working),
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
> ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline',
> 'format'='avro-confluent','avro-confluent.schema-registry.url' = '<a href="
> http://localhost:8081'">http://localhost:8081',
> 'avro-confluent.schema-registry.subject' =
> 'lndcdcadsprpslproposalline-value')
>
> Regards,
> Shamit



-- 
Svend Vanderveken
Kelesia SPRL - BE 0839 049 010
blog: https://svend.kelesia.com <http://svend.kelesia.com/>
Twitter: @sv3ndk <https://twitter.com/sv3ndk>