You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sigalit Eliazov <e....@gmail.com> on 2022/04/10 19:14:38 UTC

KafkaIO consumer rate

Hi all
I saw a very low rate when  message consuming from kafka in our different
jobs.
I order to find the bottleneck i created
a very simple pipeline that reads string messages from kafka and just
prints
the output .
The pipeline runs over flink cluster with the following setup:
1 task manager, 3 slots, parallelism set to 3


PCollection<KV<String, String>> readFromKafka = pipeline.apply(
"readFromKafka",

        KafkaTransform.readStrFromKafka(
                pipelineUtil.getBootstrapServers(), topic_name,
consumer_group));
readFromKafka.apply("Get message contents", Values.<String>create())
        .apply("Log messages", MapElements.into(TypeDescriptor.of(String.class))
                .via(message -> {
                    log.atInfo().log("Received: {}", message);
                    return message;
                }));


the kafka consumer is:

return KafkaIO.<String, String>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withConsumerConfigUpdates((ImmutableMap.of(
                "auto.offset.reset", "earliest",
                ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))

        .withoutMetadata();


according to the metrics it seems that i do  have 3 threads that read
from kafka but each one reads around 56 records per second.

per my opinion this is a very low rate.

I am not sure I understand this behaviour.

I have checked cpu and memory issues and they both look ok.

Any ideas would be really appreciated

Thanks alot

Sigalit

Re: KafkaIO consumer rate

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Sigalit,

Could you try to run your test pipeline with “--experiments=use_deprecated_read” option and see if there is a difference?

—
Alexey

> On 10 Apr 2022, at 21:14, Sigalit Eliazov <e....@gmail.com> wrote:
> 
> Hi all
> I saw a very low rate when  message consuming from kafka in our different jobs.
> I order to find the bottleneck i created 
> a very simple pipeline that reads string messages from kafka and just prints 
> the output .
> The pipeline runs over flink cluster with the following setup:
> 1 task manager, 3 slots, parallelism set to 3
> 
> 
> PCollection<KV<String, String>> readFromKafka = pipeline.apply("readFromKafka",
>         KafkaTransform.readStrFromKafka(
>                 pipelineUtil.getBootstrapServers(), topic_name, consumer_group));
> readFromKafka.apply("Get message contents", Values.<String>create())
>         .apply("Log messages", MapElements.into(TypeDescriptor.of(String.class))
>                 .via(message -> {
>                     log.atInfo().log("Received: {}", message);
>                     return message;
>                 }));
> 
> the kafka consumer is:
> return KafkaIO.<String, String>read()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .withConsumerConfigUpdates((ImmutableMap.of(
>                 "auto.offset.reset", "earliest",
>                 ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))
>         .withoutMetadata();
> 
> according to the metrics it seems that i do  have 3 threads that read from kafka but each one reads around 56 records per second.
> per my opinion this is a very low rate.
> I am not sure I understand this behaviour.
> I have checked cpu and memory issues and they both look ok.
> Any ideas would be really appreciated
> Thanks alot
> Sigalit
>