You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sigalit Eliazov <e....@gmail.com> on 2022/08/09 11:07:25 UTC
read messages from kakfa: 2 different message types in kafka topic
Hi all
we have a single kafka topic which is used to receive 2 different types of
messages.
These 2 messages are Avro.
So when reading messages from kafka i used the GenericRecord
KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withTopic(topic)
.withConsumerConfigUpdates(ImmutableMap.of(
SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
))
.withKeyDeserializer(StringDeserializer.class)
I am not sure how to define the *withValueDeserializer* and coder.
i tried to read the message as GenericRecord but it fails with
"Could not extract the Kafka Deserializer type from class
io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
i am using apicurio as the schema registry
Thanks
Sigalit
Re: read messages from kakfa: 2 different message types in kafka topic
Posted by Alexey Romanenko <ar...@gmail.com>.
Ops, my bad, I misread the initial question - Moritz pointed out that you have the only one topic with two different schemas…
I don’t think it’s supported by KafkaIO “out-of-the-box.” In this case, you need either to write your own deserialiser which will distinguish the schemas for every input message or split this topic into two where every topic contains the messages with only one schema or use Avro union as it was suggested above.
—
Alexey
> On 10 Aug 2022, at 15:03, Alexey Romanenko <ar...@gmail.com> wrote:
>
> If you have two topics with different schemas in your pipeline then you need to read them separately with two different KafkaIO instances and configure every instance with a proper deserialiser based on its schema.
>
> —
> Alexey
>
>> On 9 Aug 2022, at 22:28, Sigalit Eliazov <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>>
>> Thanks for your response
>> we have different messages with separate schemas.
>>
>> I'll review the suggested solution.
>> BR
>> Sigalit
>>
>> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mmack@talend.com <ma...@talend.com>> wrote:
>> Hi Sigalit,
>>
>>
>>
>> Could you explain a bit more in detail what you mean by 2 different types of messages?
>>
>> Do they share the same schema, e.g. using a union / one of type? Or are you in fact talking about different messages with separate schemas (e.g. discriminated using a message header)?
>>
>>
>>
>> The recommended usage (at least with Confluent) is to use one schema per topic. Using the Confluent registry it’s fairly simple then:
>>
>>
>>
>> .withValueDeserializer(
>>
>> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* latest */, config)))
>>
>>
>>
>> Most likely you have to implement a similar DeserializerProvider for Apicurio. You could also try using apicurio.registry.as-confluent, but that requires to configure your producers accordingly.
>>
>> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. That should lead you a path forward.
>>
>>
>>
>> Best,
>>
>> Moritz
>>
>>
>>
>> On 09.08.22, 13:08, "Sigalit Eliazov" <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>>
>>
>>
>> Hi all we have a single kafka topic which is used to receive 2 different types of messages. These 2 messages are Avro. So when reading messages from kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() .withBootstrapServers(bootstrapServers)
>>
>> Hi all
>>
>> we have a single kafka topic which is used to receive 2 different types of messages.
>>
>> These 2 messages are Avro.
>>
>> So when reading messages from kafka i used the GenericRecord
>>
>>
>>
>> KafkaIO.<String, GenericRecord>read()
>> .withBootstrapServers(bootstrapServers)
>> .withTopic(topic)
>> .withConsumerConfigUpdates(ImmutableMap.of(
>> SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
>> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
>> SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
>> ))
>> .withKeyDeserializer(StringDeserializer.class)
>> I am not sure how to define the withValueDeserializer and coder.
>> i tried to read the message as GenericRecord but it fails with
>> "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
>> i am using apicurio as the schema registry
>>
>> Thanks
>> Sigalit
>> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
>>
>
Re: read messages from kakfa: 2 different message types in kafka topic
Posted by Alexey Romanenko <ar...@gmail.com>.
If you have two topics with different schemas in your pipeline then you need to read them separately with two different KafkaIO instances and configure every instance with a proper deserialiser based on its schema.
—
Alexey
> On 9 Aug 2022, at 22:28, Sigalit Eliazov <e....@gmail.com> wrote:
>
> Thanks for your response
> we have different messages with separate schemas.
>
> I'll review the suggested solution.
> BR
> Sigalit
>
> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mmack@talend.com <ma...@talend.com>> wrote:
> Hi Sigalit,
>
>
>
> Could you explain a bit more in detail what you mean by 2 different types of messages?
>
> Do they share the same schema, e.g. using a union / one of type? Or are you in fact talking about different messages with separate schemas (e.g. discriminated using a message header)?
>
>
>
> The recommended usage (at least with Confluent) is to use one schema per topic. Using the Confluent registry it’s fairly simple then:
>
>
>
> .withValueDeserializer(
>
> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* latest */, config)))
>
>
>
> Most likely you have to implement a similar DeserializerProvider for Apicurio. You could also try using apicurio.registry.as-confluent, but that requires to configure your producers accordingly.
>
> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. That should lead you a path forward.
>
>
>
> Best,
>
> Moritz
>
>
>
> On 09.08.22, 13:08, "Sigalit Eliazov" <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>
>
>
> Hi all we have a single kafka topic which is used to receive 2 different types of messages. These 2 messages are Avro. So when reading messages from kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() .withBootstrapServers(bootstrapServers)
>
> Hi all
>
> we have a single kafka topic which is used to receive 2 different types of messages.
>
> These 2 messages are Avro.
>
> So when reading messages from kafka i used the GenericRecord
>
>
>
> KafkaIO.<String, GenericRecord>read()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withConsumerConfigUpdates(ImmutableMap.of(
> SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
> SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
> ))
> .withKeyDeserializer(StringDeserializer.class)
> I am not sure how to define the withValueDeserializer and coder.
> i tried to read the message as GenericRecord but it fails with
> "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
> i am using apicurio as the schema registry
>
> Thanks
> Sigalit
> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
>
Re: read messages from kakfa: 2 different message types in kafka topic
Posted by Sigalit Eliazov <e....@gmail.com>.
Thanks for your response
we have different messages with separate schemas.
I'll review the suggested solution.
BR
Sigalit
On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mm...@talend.com> wrote:
> Hi Sigalit,
>
>
>
> Could you explain a bit more in detail what you mean by 2 different types
> of messages?
>
> Do they share the same schema, e.g. using a union / one of type? Or are
> you in fact talking about different messages with separate schemas (e.g.
> discriminated using a message header)?
>
>
>
> The recommended usage (at least with Confluent) is to use one schema per
> topic. Using the Confluent registry it’s fairly simple then:
>
>
>
> .withValueDeserializer(
>
>
> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null
> /* latest */, config)))
>
>
>
> Most likely you have to implement a similar DeserializerProvider for
> Apicurio. You could also try using apicurio.registry.as-confluent, but
> that requires to configure your producers accordingly.
>
> I any case, I suggest you study
> ConfluentSchemaRegistryDeserializerProvider. That should lead you a path
> forward.
>
>
>
> Best,
>
> Moritz
>
>
>
> On 09.08.22, 13:08, "Sigalit Eliazov" <e....@gmail.com> wrote:
>
>
>
> Hi all we have a single kafka topic which is used to receive 2 different
> types of messages. These 2 messages are Avro. So when reading messages from
> kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read()
> .withBootstrapServers(bootstrapServers)
>
> Hi all
>
> we have a single kafka topic which is used to receive 2 different types of
> messages.
>
> These 2 messages are Avro.
>
> So when reading messages from kafka i used the GenericRecord
>
>
>
> KafkaIO.<String, GenericRecord>*read*()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withConsumerConfigUpdates(ImmutableMap.*of*(
> SerdeConfig.*REGISTRY_URL*, PipelineUtil.*getSchemaURL*(),
> ConsumerConfig.*GROUP_ID_CONFIG*, consumerGroup,
> SerdeConfig.*CHECK_PERIOD_MS*, TimeUnit.*DAYS*.toMillis(1)
> ))
> .withKeyDeserializer(StringDeserializer.class)
>
> I am not sure how to define the *withValueDeserializer* and coder.
>
> i tried to read the message as GenericRecord but it fails with
>
> "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
>
> i am using apicurio as the schema registry
>
>
>
> Thanks
>
> Sigalit
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> <https://www.talend.com/privacy/>*
>
>
>
Re: read messages from kakfa: 2 different message types in kafka topic
Posted by Moritz Mack <mm...@talend.com>.
Hi Sigalit,
Could you explain a bit more in detail what you mean by 2 different types of messages?
Do they share the same schema, e.g. using a union / one of type? Or are you in fact talking about different messages with separate schemas (e.g. discriminated using a message header)?
The recommended usage (at least with Confluent) is to use one schema per topic. Using the Confluent registry it’s fairly simple then:
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* latest */, config)))
Most likely you have to implement a similar DeserializerProvider for Apicurio. You could also try using apicurio.registry.as-confluent, but that requires to configure your producers accordingly.
I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. That should lead you a path forward.
Best,
Moritz
On 09.08.22, 13:08, "Sigalit Eliazov" <e....@gmail.com> wrote:
Hi all we have a single kafka topic which is used to receive 2 different types of messages. These 2 messages are Avro. So when reading messages from kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() .withBootstrapServers(bootstrapServers)
Hi all
we have a single kafka topic which is used to receive 2 different types of messages.
These 2 messages are Avro.
So when reading messages from kafka i used the GenericRecord
KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withTopic(topic)
.withConsumerConfigUpdates(ImmutableMap.of(
SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
))
.withKeyDeserializer(StringDeserializer.class)
I am not sure how to define the withValueDeserializer and coder.
i tried to read the message as GenericRecord but it fails with
"Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
i am using apicurio as the schema registry
Thanks
Sigalit
As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>