You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Anshul Bansal (Jira)" <ji...@apache.org> on 2020/07/20 07:40:00 UTC

[jira] [Comment Edited] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

    [ https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160985#comment-17160985 ] 

Anshul Bansal edited comment on FLINK-16048 at 7/20/20, 7:39 AM:
-----------------------------------------------------------------

[~danny0405], [~ykt836], to take an idea, there is an excellent open source library for Spark :  [https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md]

Basically its a Confluent avro bridge for spark, As confluent can store avro schema both key and value and can also store multiple versions of schema for each, at the minimum this library expects topic name, schema registry URL with some basic identifier to find matching subjects for key and value from confluent registry. 
 In the same way, if we can add a support like this, where user doesn't need to provide for schema and format but they are coming from schema registry and then directly user can do select * from kafka topic, that would be great. 


was (Author: anshul.bansal):
[~danny0405], [~ykt836], to take an idea, there is an excellent open source library for Spark :  [https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md]

Basically its a Confluent avro bridge for spark, As confluent can store avro schema both key and value, at the minimum this library expects topic name, schema registry URL with some basic identifier to find matching subjects for key and value from confluent registry. 
In the same way, if we can add a support like this, where user doesn't need to provide for schema and format but they are coming from schema registry and then directly user can do select * from kafka topic, that would be great. 

> Support read/write confluent schema registry avro data  from Kafka
> ------------------------------------------------------------------
>
>                 Key: FLINK-16048
>                 URL: https://issues.apache.org/jira/browse/FLINK-16048
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem
>    Affects Versions: 1.11.0
>            Reporter: Leonard Xu
>            Assignee: Danny Chen
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by `KafkaAvroSerializer` and only can consume Row data with avro schema because we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the confluent is a company name which i think is not that suitable for a format name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)