You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Dénes Bartha (Jira)" <ji...@apache.org> on 2021/09/10 10:02:00 UTC

[jira] [Created] (BEAM-12871) Support Schema Registry

Dénes Bartha created BEAM-12871:
-----------------------------------

             Summary: Support Schema Registry
                 Key: BEAM-12871
                 URL: https://issues.apache.org/jira/browse/BEAM-12871
             Project: Beam
          Issue Type: Improvement
          Components: io-py-kafka
            Reporter: Dénes Bartha


I would like to use the Python components `{{apache_beam.io.kafka.}}{{WriteToKafka}}` and `{{apache_beam.io.kafka.ReadFromKafka}}` while accessing and updating [Schema Registry|https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html] for sending and reading `protobuf` messages.

In Java it is possible to achieve [this|https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/kafka/KafkaIO.html]:

 
{code:java}
 PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
   .apply(KafkaIO.<Long, GenericRecord>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")
      .withKeyDeserializer(LongDeserializer.class)
      // Use Confluent Schema Registry, specify schema registry URL and value subject
      .withValueDeserializer(
          ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
    ...
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)