You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Dénes Bartha (Jira)" <ji...@apache.org> on 2021/09/10 10:03:00 UTC

[jira] [Updated] (BEAM-12871) Support for Confluent Schema Registry

     [ https://issues.apache.org/jira/browse/BEAM-12871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dénes Bartha updated BEAM-12871:
--------------------------------
    Summary: Support for Confluent Schema Registry  (was: Support Schema Registry)

> Support for Confluent Schema Registry
> -------------------------------------
>
>                 Key: BEAM-12871
>                 URL: https://issues.apache.org/jira/browse/BEAM-12871
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-kafka
>            Reporter: Dénes Bartha
>            Priority: P0
>
> I would like to use the Python components `{{apache_beam.io.kafka.}}{{WriteToKafka}}` and `{{apache_beam.io.kafka.ReadFromKafka}}` while accessing and updating [Schema Registry|https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html] for sending and reading `protobuf` messages.
> In Java it is possible to achieve [this|https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/kafka/KafkaIO.html]:
>  
> {code:java}
>  PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
>    .apply(KafkaIO.<Long, GenericRecord>read()
>       .withBootstrapServers("broker_1:9092,broker_2:9092")
>       .withTopic("my_topic")
>       .withKeyDeserializer(LongDeserializer.class)
>       // Use Confluent Schema Registry, specify schema registry URL and value subject
>       .withValueDeserializer(
>           ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
>     ...
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)