You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Dénes Bartha (Jira)" <ji...@apache.org> on 2021/09/10 10:02:00 UTC
[jira] [Created] (BEAM-12871) Support Schema Registry
Dénes Bartha created BEAM-12871:
-----------------------------------
Summary: Support Schema Registry
Key: BEAM-12871
URL: https://issues.apache.org/jira/browse/BEAM-12871
Project: Beam
Issue Type: Improvement
Components: io-py-kafka
Reporter: Dénes Bartha
I would like to use the Python components `{{apache_beam.io.kafka.}}{{WriteToKafka}}` and `{{apache_beam.io.kafka.ReadFromKafka}}` while accessing and updating [Schema Registry|https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html] for sending and reading `protobuf` messages.
In Java it is possible to achieve [this|https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/kafka/KafkaIO.html]:
{code:java}
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
// Use Confluent Schema Registry, specify schema registry URL and value subject
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
...
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)