You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sigalit Eliazov <e....@gmail.com> on 2023/04/09 10:05:27 UTC

major reduction is performance when using schema registry - KafkaIO

Hello,

I am trying to understand the effect of schema registry on our pipeline's
performance. In order to do sowe created a very simple pipeline that reads
from kafka, runs a simple transformation of adding new field and writes of
kafka.  the messages are in avro format

I ran this pipeline with 3 different options on same configuration : 1
kafka partition, 1 task manager, 1 slot, 1 parallelism:

* when i used apicurio as the schema registry i was able to process only
2000 messages per second
* when i used confluent schema registry i was able to process 7000 messages
per second
* when I did not use any schema registry and used plain avro
deserializer/serializer i was able to process *30K* messages per second.

I understand that using a schema registry may cause a reduction in
performance but  in my opinion the difference is too high.
Any comments or suggestions about these results?

Thanks in advance
Sigalit

Re: major reduction is performance when using schema registry - KafkaIO

Posted by Alexey Romanenko <ar...@gmail.com>.
Thanks for testing this!

It requires some additional investigations, so I created an issue for that: 
https://github.com/apache/beam/issues/26262

Feel free to add more details if you have there.

—
Alexey

> On 13 Apr 2023, at 12:45, Sigalit Eliazov <e....@gmail.com> wrote:
> 
> I have made the suggested change and used ConfluentSchemaRegistryDeserializerProvider
> the results are slightly  better.. average of 8000 msg/sec 
> 
> Thank you both for your response and i'll appreciate if you can keep me in the loop in the planned work with kafka schema or let me know if i can assist in anyway,
> 
> Thanks
> Sigalit
> 
> On Wed, Apr 12, 2023 at 8:00 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> Mine was the similar but "org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider" is leveraging “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that I guessed should reduce this potential impact.
>> 
>> —
>> Alexey
>> 
>>> On 12 Apr 2023, at 17:36, John Casey via user <user@beam.apache.org <ma...@beam.apache.org>> wrote:
>>> 
>>> My initial guess is that there are queries being made in order to retrieve the schemas, which would impact performance, especially if those queries aren't cached with Beam splitting in mind. 
>>> 
>>> I'm looking to improve our interaction with Kafka schemas in the next couple of quarters, so I'll keep this case in mind while working on that.
>>> 
>>> John
>>> 
>>> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>>> I don’t have an exact answer why it’s so much slower for now (only some guesses but it requires some profiling), though could you try to test the same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” instead of KafkaAvroDeserializer and AvroCoder?
>>>> 
>>>> More details and an example how to use is here:
>>>> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html (go to “Use Avro schema with Confluent Schema Registry”)
>>>> 
>>>> —
>>>> Alexey
>>>> 
>>>> 
>>>> 
>>>>> On 10 Apr 2023, at 07:35, Sigalit Eliazov <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> hi,
>>>>> KafkaIO.<String, T>read()
>>>>>         .withBootstrapServers(bootstrapServers)
>>>>>         .withTopic(topic)
>>>>>         .withConsumerConfigUpdates(Map.ofEntries(
>>>>>                 Map.entry("schema.registry.url", registryURL),
>>>>>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ UUID.randomUUID()),
>>>>>         ))
>>>>>         .withKeyDeserializer(StringDeserializer.class)
>>>>>         .withValueDeserializerAndCoder((Class) io.confluent.kafka.serializers.KafkaAvroDeserializer.class, AvroCoder.of(avroClass));
>>>>> 
>>>>> Thanks
>>>>> Sigalit
>>>>> 
>>>>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <user@beam.apache.org <ma...@beam.apache.org>> wrote:
>>>>>> How are you using the schema registry? Do you have a code sample?
>>>>>> 
>>>>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>>>>>>> Hello,
>>>>>>> 
>>>>>>> I am trying to understand the effect of schema registry on our pipeline's performance. In order to do sowe created a very simple pipeline that reads from kafka, runs a simple transformation of adding new field and writes of kafka.  the messages are in avro format
>>>>>>> 
>>>>>>> I ran this pipeline with 3 different options on same configuration : 1 kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>>>> 
>>>>>>> * when i used apicurio as the schema registry i was able to process only 2000 messages per second
>>>>>>> * when i used confluent schema registry i was able to process 7000 messages per second
>>>>>>> * when I did not use any schema registry and used plain avro deserializer/serializer i was able to process 30K messages per second.
>>>>>>> 
>>>>>>> I understand that using a schema registry may cause a reduction in performance but  in my opinion the difference is too high. 
>>>>>>> Any comments or suggestions about these results?
>>>>>>> 
>>>>>>> Thanks in advance
>>>>>>> Sigalit
>>>> 
>> 


Re: major reduction is performance when using schema registry - KafkaIO

Posted by Sigalit Eliazov <e....@gmail.com>.
I have made the suggested change and used
ConfluentSchemaRegistryDeserializerProvider
the results are slightly  better.. average of 8000 msg/sec

Thank you both for your response and i'll appreciate if you can keep me in
the loop in the planned work with kafka schema or let me know if i can
assist in anyway,

Thanks
Sigalit

On Wed, Apr 12, 2023 at 8:00 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Mine was the similar but
> "org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider"
> is leveraging
> “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that
> I guessed should reduce this potential impact.
>
> —
> Alexey
>
> On 12 Apr 2023, at 17:36, John Casey via user <us...@beam.apache.org>
> wrote:
>
> My initial guess is that there are queries being made in order to
> retrieve the schemas, which would impact performance, especially if those
> queries aren't cached with Beam splitting in mind.
>
> I'm looking to improve our interaction with Kafka schemas in the next
> couple of quarters, so I'll keep this case in mind while working on that.
>
> John
>
> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
>
>> I don’t have an exact answer why it’s so much slower for now (only some
>> guesses but it requires some profiling), though could you try to test the
>> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider”
>> instead of KafkaAvroDeserializer and AvroCoder?
>>
>> More details and an example how to use is here:
>>
>> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html (go
>> to “Use Avro schema with Confluent Schema Registry”)
>>
>> —
>> Alexey
>>
>>
>>
>> On 10 Apr 2023, at 07:35, Sigalit Eliazov <e....@gmail.com> wrote:
>>
>> hi,
>>
>> KafkaIO.<String, T>read()
>>         .withBootstrapServers(bootstrapServers)
>>         .withTopic(topic)
>>         .withConsumerConfigUpdates(Map.ofEntries(
>>                 Map.entry("schema.registry.url", registryURL),
>>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ UUID.randomUUID()),
>>         ))
>>         .withKeyDeserializer(StringDeserializer.class)
>>         .withValueDeserializerAndCoder((Class) io.confluent.kafka.serializers.KafkaAvroDeserializer.class, AvroCoder.of(avroClass));
>>
>>
>> Thanks
>>
>> Sigalit
>>
>>
>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <us...@beam.apache.org>
>> wrote:
>>
>>> How are you using the schema registry? Do you have a code sample?
>>>
>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e....@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am trying to understand the effect of schema registry on our
>>>> pipeline's performance. In order to do sowe created a very simple pipeline
>>>> that reads from kafka, runs a simple transformation of adding new field and
>>>> writes of kafka.  the messages are in avro format
>>>>
>>>> I ran this pipeline with 3 different options on same configuration : 1
>>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>
>>>> * when i used apicurio as the schema registry i was able to process
>>>> only 2000 messages per second
>>>> * when i used confluent schema registry i was able to process 7000
>>>> messages per second
>>>> * when I did not use any schema registry and used plain avro
>>>> deserializer/serializer i was able to process *30K* messages per
>>>> second.
>>>>
>>>> I understand that using a schema registry may cause a reduction in
>>>> performance but  in my opinion the difference is too high.
>>>> Any comments or suggestions about these results?
>>>>
>>>> Thanks in advance
>>>> Sigalit
>>>>
>>>
>>
>

Re: major reduction is performance when using schema registry - KafkaIO

Posted by Alexey Romanenko <ar...@gmail.com>.
Mine was the similar but "org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider" is leveraging “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that I guessed should reduce this potential impact.

—
Alexey

> On 12 Apr 2023, at 17:36, John Casey via user <us...@beam.apache.org> wrote:
> 
> My initial guess is that there are queries being made in order to retrieve the schemas, which would impact performance, especially if those queries aren't cached with Beam splitting in mind. 
> 
> I'm looking to improve our interaction with Kafka schemas in the next couple of quarters, so I'll keep this case in mind while working on that.
> 
> John
> 
> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> I don’t have an exact answer why it’s so much slower for now (only some guesses but it requires some profiling), though could you try to test the same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” instead of KafkaAvroDeserializer and AvroCoder?
>> 
>> More details and an example how to use is here:
>> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html (go to “Use Avro schema with Confluent Schema Registry”)
>> 
>> —
>> Alexey
>> 
>> 
>> 
>>> On 10 Apr 2023, at 07:35, Sigalit Eliazov <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> hi,
>>> KafkaIO.<String, T>read()
>>>         .withBootstrapServers(bootstrapServers)
>>>         .withTopic(topic)
>>>         .withConsumerConfigUpdates(Map.ofEntries(
>>>                 Map.entry("schema.registry.url", registryURL),
>>>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ UUID.randomUUID()),
>>>         ))
>>>         .withKeyDeserializer(StringDeserializer.class)
>>>         .withValueDeserializerAndCoder((Class) io.confluent.kafka.serializers.KafkaAvroDeserializer.class, AvroCoder.of(avroClass));
>>> 
>>> Thanks
>>> Sigalit
>>> 
>>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <user@beam.apache.org <ma...@beam.apache.org>> wrote:
>>>> How are you using the schema registry? Do you have a code sample?
>>>> 
>>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hello,
>>>>> 
>>>>> I am trying to understand the effect of schema registry on our pipeline's performance. In order to do sowe created a very simple pipeline that reads from kafka, runs a simple transformation of adding new field and writes of kafka.  the messages are in avro format
>>>>> 
>>>>> I ran this pipeline with 3 different options on same configuration : 1 kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>> 
>>>>> * when i used apicurio as the schema registry i was able to process only 2000 messages per second
>>>>> * when i used confluent schema registry i was able to process 7000 messages per second
>>>>> * when I did not use any schema registry and used plain avro deserializer/serializer i was able to process 30K messages per second.
>>>>> 
>>>>> I understand that using a schema registry may cause a reduction in performance but  in my opinion the difference is too high. 
>>>>> Any comments or suggestions about these results?
>>>>> 
>>>>> Thanks in advance
>>>>> Sigalit
>> 


Re: major reduction is performance when using schema registry - KafkaIO

Posted by John Casey via user <us...@beam.apache.org>.
My initial guess is that there are queries being made in order to
retrieve the schemas, which would impact performance, especially if those
queries aren't cached with Beam splitting in mind.

I'm looking to improve our interaction with Kafka schemas in the next
couple of quarters, so I'll keep this case in mind while working on that.

John

On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> I don’t have an exact answer why it’s so much slower for now (only some
> guesses but it requires some profiling), though could you try to test the
> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider”
> instead of KafkaAvroDeserializer and AvroCoder?
>
> More details and an example how to use is here:
>
> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html (go
> to “Use Avro schema with Confluent Schema Registry”)
>
> —
> Alexey
>
>
>
> On 10 Apr 2023, at 07:35, Sigalit Eliazov <e....@gmail.com> wrote:
>
> hi,
>
> KafkaIO.<String, T>read()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withConsumerConfigUpdates(Map.ofEntries(
>                 Map.entry("schema.registry.url", registryURL),
>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ UUID.randomUUID()),
>         ))
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializerAndCoder((Class) io.confluent.kafka.serializers.KafkaAvroDeserializer.class, AvroCoder.of(avroClass));
>
>
> Thanks
>
> Sigalit
>
>
> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <us...@beam.apache.org>
> wrote:
>
>> How are you using the schema registry? Do you have a code sample?
>>
>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e....@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I am trying to understand the effect of schema registry on our
>>> pipeline's performance. In order to do sowe created a very simple pipeline
>>> that reads from kafka, runs a simple transformation of adding new field and
>>> writes of kafka.  the messages are in avro format
>>>
>>> I ran this pipeline with 3 different options on same configuration : 1
>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>
>>> * when i used apicurio as the schema registry i was able to process only
>>> 2000 messages per second
>>> * when i used confluent schema registry i was able to process 7000
>>> messages per second
>>> * when I did not use any schema registry and used plain avro
>>> deserializer/serializer i was able to process *30K* messages per second.
>>>
>>> I understand that using a schema registry may cause a reduction in
>>> performance but  in my opinion the difference is too high.
>>> Any comments or suggestions about these results?
>>>
>>> Thanks in advance
>>> Sigalit
>>>
>>
>

Re: major reduction is performance when using schema registry - KafkaIO

Posted by Alexey Romanenko <ar...@gmail.com>.
I don’t have an exact answer why it’s so much slower for now (only some guesses but it requires some profiling), though could you try to test the same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” instead of KafkaAvroDeserializer and AvroCoder?

More details and an example how to use is here:
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html (go to “Use Avro schema with Confluent Schema Registry”)

—
Alexey



> On 10 Apr 2023, at 07:35, Sigalit Eliazov <e....@gmail.com> wrote:
> 
> hi,
> KafkaIO.<String, T>read()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withConsumerConfigUpdates(Map.ofEntries(
>                 Map.entry("schema.registry.url", registryURL),
>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ UUID.randomUUID()),
>         ))
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializerAndCoder((Class) io.confluent.kafka.serializers.KafkaAvroDeserializer.class, AvroCoder.of(avroClass));
> 
> Thanks
> Sigalit
> 
> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <user@beam.apache.org <ma...@beam.apache.org>> wrote:
>> How are you using the schema registry? Do you have a code sample?
>> 
>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e.sigalit@gmail.com <ma...@gmail.com>> wrote:
>>> Hello,
>>> 
>>> I am trying to understand the effect of schema registry on our pipeline's performance. In order to do sowe created a very simple pipeline that reads from kafka, runs a simple transformation of adding new field and writes of kafka.  the messages are in avro format
>>> 
>>> I ran this pipeline with 3 different options on same configuration : 1 kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>> 
>>> * when i used apicurio as the schema registry i was able to process only 2000 messages per second
>>> * when i used confluent schema registry i was able to process 7000 messages per second
>>> * when I did not use any schema registry and used plain avro deserializer/serializer i was able to process 30K messages per second.
>>> 
>>> I understand that using a schema registry may cause a reduction in performance but  in my opinion the difference is too high. 
>>> Any comments or suggestions about these results?
>>> 
>>> Thanks in advance
>>> Sigalit


Re: major reduction is performance when using schema registry - KafkaIO

Posted by Sigalit Eliazov <e....@gmail.com>.
hi,

KafkaIO.<String, T>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withConsumerConfigUpdates(Map.ofEntries(
                Map.entry("schema.registry.url", registryURL),
                Map.entry(ConsumerConfig.GROUP_ID_CONFIG,
consumerGroup+ UUID.randomUUID()),
        ))
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializerAndCoder((Class)
io.confluent.kafka.serializers.KafkaAvroDeserializer.class,
AvroCoder.of(avroClass));


Thanks

Sigalit


On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <us...@beam.apache.org>
wrote:

> How are you using the schema registry? Do you have a code sample?
>
> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e....@gmail.com>
> wrote:
>
>> Hello,
>>
>> I am trying to understand the effect of schema registry on our pipeline's
>> performance. In order to do sowe created a very simple pipeline that reads
>> from kafka, runs a simple transformation of adding new field and writes of
>> kafka.  the messages are in avro format
>>
>> I ran this pipeline with 3 different options on same configuration : 1
>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>
>> * when i used apicurio as the schema registry i was able to process only
>> 2000 messages per second
>> * when i used confluent schema registry i was able to process 7000
>> messages per second
>> * when I did not use any schema registry and used plain avro
>> deserializer/serializer i was able to process *30K* messages per second.
>>
>> I understand that using a schema registry may cause a reduction in
>> performance but  in my opinion the difference is too high.
>> Any comments or suggestions about these results?
>>
>> Thanks in advance
>> Sigalit
>>
>

Re: major reduction is performance when using schema registry - KafkaIO

Posted by Reuven Lax via user <us...@beam.apache.org>.
How are you using the schema registry? Do you have a code sample?

On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e....@gmail.com> wrote:

> Hello,
>
> I am trying to understand the effect of schema registry on our pipeline's
> performance. In order to do sowe created a very simple pipeline that reads
> from kafka, runs a simple transformation of adding new field and writes of
> kafka.  the messages are in avro format
>
> I ran this pipeline with 3 different options on same configuration : 1
> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>
> * when i used apicurio as the schema registry i was able to process only
> 2000 messages per second
> * when i used confluent schema registry i was able to process 7000
> messages per second
> * when I did not use any schema registry and used plain avro
> deserializer/serializer i was able to process *30K* messages per second.
>
> I understand that using a schema registry may cause a reduction in
> performance but  in my opinion the difference is too high.
> Any comments or suggestions about these results?
>
> Thanks in advance
> Sigalit
>