You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sigalit Eliazov <e....@gmail.com> on 2022/08/09 11:07:25 UTC

read messages from kakfa: 2 different message types in kafka topic

Hi all
we have a single kafka topic which is used to receive 2 different types of
messages.
These 2 messages are Avro.
So when reading messages from kafka i used the GenericRecord

KafkaIO.<String, GenericRecord>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withConsumerConfigUpdates(ImmutableMap.of(
                SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
                ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
                SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
        ))
        .withKeyDeserializer(StringDeserializer.class)

I am not sure how to define the *withValueDeserializer* and coder.

i tried to read the message as GenericRecord but it fails with

 "Could not extract the Kafka Deserializer type from class
io.apicurio.registry.serde.avro.AvroKafkaDeserialize"

i am using apicurio as the schema registry


Thanks

Sigalit

Re: read messages from kakfa: 2 different message types in kafka topic

Posted by Alexey Romanenko <ar...@gmail.com>.
Ops, my bad, I misread the initial question - Moritz pointed out that you have the only one topic with two different schemas… 

I don’t think it’s supported by KafkaIO “out-of-the-box.” In this case, you need either to write your own deserialiser which will distinguish the schemas for every input message or split this topic into two where every topic contains the messages with only one schema or use Avro union as it was suggested above.

—
Alexey

> On 10 Aug 2022, at 15:03, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> If you have two topics with different schemas in your pipeline then you need to read them separately with two different KafkaIO instances and configure every instance with a proper deserialiser based on its schema.
> 
> —
> Alexey
> 
>> On 9 Aug 2022, at 22:28, Sigalit Eliazov <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Thanks for your response
>> we have different messages with separate schemas.
>> 
>> I'll review the suggested solution.
>> BR
>> Sigalit
>> 
>> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mmack@talend.com <ma...@talend.com>> wrote:
>> Hi Sigalit,
>> 
>>  
>> 
>> Could you explain a bit more in detail what you mean by 2 different types of messages?
>> 
>> Do they share the same schema, e.g. using a union / one of type? Or are you in fact talking about different messages with separate schemas (e.g. discriminated using a message header)?
>> 
>>  
>> 
>> The recommended usage (at least with Confluent) is to use one schema per topic. Using the Confluent registry it’s fairly simple then:
>> 
>>  
>> 
>>              .withValueDeserializer(
>> 
>>                     ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* latest */, config)))
>> 
>>  
>> 
>> Most likely you have to implement a similar DeserializerProvider for Apicurio. You could also try using  apicurio.registry.as-confluent, but that requires to configure your producers accordingly.
>> 
>> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. That should lead you a path forward.
>> 
>>  
>> 
>> Best,
>> 
>> Moritz
>> 
>>  
>> 
>> On 09.08.22, 13:08, "Sigalit Eliazov" <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>> 
>>  
>> 
>> Hi all we have a single kafka topic which is used to receive 2 different types of messages. These 2 messages are Avro. So when reading messages from kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() .withBootstrapServers(bootstrapServers)
>> 
>> Hi all
>> 
>> we have a single kafka topic which is used to receive 2 different types of messages.
>> 
>> These 2 messages are Avro.
>> 
>> So when reading messages from kafka i used the GenericRecord
>> 
>>  
>> 
>> KafkaIO.<String, GenericRecord>read()
>>         .withBootstrapServers(bootstrapServers)
>>         .withTopic(topic)
>>         .withConsumerConfigUpdates(ImmutableMap.of(
>>                 SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
>>                 ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
>>                 SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
>>         ))
>>         .withKeyDeserializer(StringDeserializer.class)
>> I am not sure how to define the withValueDeserializer and coder.
>> i tried to read the message as GenericRecord but it fails with
>>  "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize" 
>> i am using apicurio as the schema registry
>>  
>> Thanks
>> Sigalit
>> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
>> 
> 


Re: read messages from kakfa: 2 different message types in kafka topic

Posted by Alexey Romanenko <ar...@gmail.com>.
If you have two topics with different schemas in your pipeline then you need to read them separately with two different KafkaIO instances and configure every instance with a proper deserialiser based on its schema.

—
Alexey

> On 9 Aug 2022, at 22:28, Sigalit Eliazov <e....@gmail.com> wrote:
> 
> Thanks for your response
> we have different messages with separate schemas.
> 
> I'll review the suggested solution.
> BR
> Sigalit
> 
> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mmack@talend.com <ma...@talend.com>> wrote:
> Hi Sigalit,
> 
>  
> 
> Could you explain a bit more in detail what you mean by 2 different types of messages?
> 
> Do they share the same schema, e.g. using a union / one of type? Or are you in fact talking about different messages with separate schemas (e.g. discriminated using a message header)?
> 
>  
> 
> The recommended usage (at least with Confluent) is to use one schema per topic. Using the Confluent registry it’s fairly simple then:
> 
>  
> 
>              .withValueDeserializer(
> 
>                     ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* latest */, config)))
> 
>  
> 
> Most likely you have to implement a similar DeserializerProvider for Apicurio. You could also try using  apicurio.registry.as-confluent, but that requires to configure your producers accordingly.
> 
> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. That should lead you a path forward.
> 
>  
> 
> Best,
> 
> Moritz
> 
>  
> 
> On 09.08.22, 13:08, "Sigalit Eliazov" <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
> 
>  
> 
> Hi all we have a single kafka topic which is used to receive 2 different types of messages. These 2 messages are Avro. So when reading messages from kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() .withBootstrapServers(bootstrapServers)
> 
> Hi all
> 
> we have a single kafka topic which is used to receive 2 different types of messages.
> 
> These 2 messages are Avro.
> 
> So when reading messages from kafka i used the GenericRecord
> 
>  
> 
> KafkaIO.<String, GenericRecord>read()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withConsumerConfigUpdates(ImmutableMap.of(
>                 SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
>                 ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
>                 SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
>         ))
>         .withKeyDeserializer(StringDeserializer.class)
> I am not sure how to define the withValueDeserializer and coder.
> i tried to read the message as GenericRecord but it fails with
>  "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize" 
> i am using apicurio as the schema registry
>  
> Thanks
> Sigalit
> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
> 


Re: read messages from kakfa: 2 different message types in kafka topic

Posted by Sigalit Eliazov <e....@gmail.com>.
Thanks for your response
we have different messages with separate schemas.

I'll review the suggested solution.
BR
Sigalit

On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mm...@talend.com> wrote:

> Hi Sigalit,
>
>
>
> Could you explain a bit more in detail what you mean by 2 different types
> of messages?
>
> Do they share the same schema, e.g. using a union / one of type? Or are
> you in fact talking about different messages with separate schemas (e.g.
> discriminated using a message header)?
>
>
>
> The recommended usage (at least with Confluent) is to use one schema per
> topic. Using the Confluent registry it’s fairly simple then:
>
>
>
>              .withValueDeserializer(
>
>
> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null
> /* latest */, config)))
>
>
>
> Most likely you have to implement a similar DeserializerProvider for
> Apicurio. You could also try using  apicurio.registry.as-confluent, but
> that requires to configure your producers accordingly.
>
> I any case, I suggest you study
> ConfluentSchemaRegistryDeserializerProvider. That should lead you a path
> forward.
>
>
>
> Best,
>
> Moritz
>
>
>
> On 09.08.22, 13:08, "Sigalit Eliazov" <e....@gmail.com> wrote:
>
>
>
> Hi all we have a single kafka topic which is used to receive 2 different
> types of messages. These 2 messages are Avro. So when reading messages from
> kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read()
> .withBootstrapServers(bootstrapServers)
>
> Hi all
>
> we have a single kafka topic which is used to receive 2 different types of
> messages.
>
> These 2 messages are Avro.
>
> So when reading messages from kafka i used the GenericRecord
>
>
>
> KafkaIO.<String, GenericRecord>*read*()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withConsumerConfigUpdates(ImmutableMap.*of*(
>                 SerdeConfig.*REGISTRY_URL*, PipelineUtil.*getSchemaURL*(),
>                 ConsumerConfig.*GROUP_ID_CONFIG*, consumerGroup,
>                 SerdeConfig.*CHECK_PERIOD_MS*, TimeUnit.*DAYS*.toMillis(1)
>         ))
>         .withKeyDeserializer(StringDeserializer.class)
>
> I am not sure how to define the *withValueDeserializer* and coder.
>
> i tried to read the message as GenericRecord but it fails with
>
>  "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
>
> i am using apicurio as the schema registry
>
>
>
> Thanks
>
> Sigalit
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> <https://www.talend.com/privacy/>*
>
>
>

Re: read messages from kakfa: 2 different message types in kafka topic

Posted by Moritz Mack <mm...@talend.com>.
Hi Sigalit,

Could you explain a bit more in detail what you mean by 2 different types of messages?
Do they share the same schema, e.g. using a union / one of type? Or are you in fact talking about different messages with separate schemas (e.g. discriminated using a message header)?

The recommended usage (at least with Confluent) is to use one schema per topic. Using the Confluent registry it’s fairly simple then:

             .withValueDeserializer(
                    ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* latest */, config)))

Most likely you have to implement a similar DeserializerProvider for Apicurio. You could also try using  apicurio.registry.as-confluent, but that requires to configure your producers accordingly.
I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. That should lead you a path forward.

Best,
Moritz

On 09.08.22, 13:08, "Sigalit Eliazov" <e....@gmail.com> wrote:

Hi all we have a single kafka topic which is used to receive 2 different types of messages. These 2 messages are Avro. So when reading messages from kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() .withBootstrapServers(bootstrapServers)

Hi all
we have a single kafka topic which is used to receive 2 different types of messages.
These 2 messages are Avro.
So when reading messages from kafka i used the GenericRecord


KafkaIO.<String, GenericRecord>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withConsumerConfigUpdates(ImmutableMap.of(
                SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
                ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
                SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
        ))
        .withKeyDeserializer(StringDeserializer.class)

I am not sure how to define the withValueDeserializer and coder.

i tried to read the message as GenericRecord but it fails with

 "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize"

i am using apicurio as the schema registry



Thanks

Sigalit

As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>