You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sigalit Eliazov <e....@gmail.com> on 2022/04/10 19:14:38 UTC
KafkaIO consumer rate
Hi all
I saw a very low rate when message consuming from kafka in our different
jobs.
I order to find the bottleneck i created
a very simple pipeline that reads string messages from kafka and just
prints
the output .
The pipeline runs over flink cluster with the following setup:
1 task manager, 3 slots, parallelism set to 3
PCollection<KV<String, String>> readFromKafka = pipeline.apply(
"readFromKafka",
KafkaTransform.readStrFromKafka(
pipelineUtil.getBootstrapServers(), topic_name,
consumer_group));
readFromKafka.apply("Get message contents", Values.<String>create())
.apply("Log messages", MapElements.into(TypeDescriptor.of(String.class))
.via(message -> {
log.atInfo().log("Received: {}", message);
return message;
}));
the kafka consumer is:
return KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapServers)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates((ImmutableMap.of(
"auto.offset.reset", "earliest",
ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))
.withoutMetadata();
according to the metrics it seems that i do have 3 threads that read
from kafka but each one reads around 56 records per second.
per my opinion this is a very low rate.
I am not sure I understand this behaviour.
I have checked cpu and memory issues and they both look ok.
Any ideas would be really appreciated
Thanks alot
Sigalit
Re: KafkaIO consumer rate
Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Sigalit,
Could you try to run your test pipeline with “--experiments=use_deprecated_read” option and see if there is a difference?
—
Alexey
> On 10 Apr 2022, at 21:14, Sigalit Eliazov <e....@gmail.com> wrote:
>
> Hi all
> I saw a very low rate when message consuming from kafka in our different jobs.
> I order to find the bottleneck i created
> a very simple pipeline that reads string messages from kafka and just prints
> the output .
> The pipeline runs over flink cluster with the following setup:
> 1 task manager, 3 slots, parallelism set to 3
>
>
> PCollection<KV<String, String>> readFromKafka = pipeline.apply("readFromKafka",
> KafkaTransform.readStrFromKafka(
> pipelineUtil.getBootstrapServers(), topic_name, consumer_group));
> readFromKafka.apply("Get message contents", Values.<String>create())
> .apply("Log messages", MapElements.into(TypeDescriptor.of(String.class))
> .via(message -> {
> log.atInfo().log("Received: {}", message);
> return message;
> }));
>
> the kafka consumer is:
> return KafkaIO.<String, String>read()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .withConsumerConfigUpdates((ImmutableMap.of(
> "auto.offset.reset", "earliest",
> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))
> .withoutMetadata();
>
> according to the metrics it seems that i do have 3 threads that read from kafka but each one reads around 56 records per second.
> per my opinion this is a very low rate.
> I am not sure I understand this behaviour.
> I have checked cpu and memory issues and they both look ok.
> Any ideas would be really appreciated
> Thanks alot
> Sigalit
>