You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "TJ (Jira)" <ji...@apache.org> on 2019/09/02 14:04:00 UTC

[jira] [Comment Edited] (BEAM-8121) Messages are not distributed per machines when consuming from Kafka topic with 1 partition

    [ https://issues.apache.org/jira/browse/BEAM-8121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920886#comment-16920886 ] 

TJ edited comment on BEAM-8121 at 9/2/19 2:03 PM:
--------------------------------------------------

I use Dataflow runner. 

I tried Reshuffle.viaRandomKey(), it worked quite poorly and had side effects:
 * Pipeline started to have constant lag (2-3 min) on Reshuffle step.
 * CPU usage increased from 20-30% to almost 80%.  On n1-standard-4 ([https://cloud.google.com/compute/docs/machine-types]).
 * With single machine it was consuming less messages. 1 machine without reshuffle step ±1500message/s, 1 machine with reshuffle step ±450message/s. When it scaled to 5 it was consuming the same 1500message/s as without the step.

I approached google support and they advised do not use Reshuffle step at all. Cause it's deprecated and not stable.
I assume if I would use more powerful machines and the higher number of them as 5 it would theoretically reach the max throughput, but it would be really costly for such small message throughput.

One more thing to mention. I did the throughput test for Kafka Standalone app and Dataflow pipeline. They were consuming from the same topic with 1 partition. Message size 1.3KB. Results:
 Standalone kafka client was consuming 5000 message/s 
 Dataflow pipeline with few empty tasks 3200message/s
 Dataflow pipeline with normal tasks what manipulates data 1500message/s


was (Author: taujan):
I use Dataflow runner. 

I tried Reshuffle.viaRandomKey(), it worked quite poorly and had side effects:
 * Pipeline started to have constant lag (2-3 min) on Reshuffle step.
 * CPU usage increased from 20-30% to almost 80%.  On n1-standard-4 ([https://cloud.google.com/compute/docs/machine-types]).
 * With single machine it was consuming less messages. 1 machine without reshuffle step ±1500message/s, 1 machine with reshuffle step ±450message/s. When it scaled to 5 it was consuming the same 1500message/s as without the step.

I approached google support and they advised do not use Reshuffle step at all. Cause it's deprecated and not stable.

One more thing to mention. I did the throughput test for Kafka Standalone app and Dataflow pipeline. They were consuming from the same topic with 1 partition. Results:
Standalone kafka client was consuming 4500 message/s 
Dataflow pipeline with few empty tasks 3200message/s
Dataflow pipeline with normal tasks what manipulates data 1500message/s

> Messages are not distributed per machines when consuming from Kafka topic with 1 partition
> ------------------------------------------------------------------------------------------
>
>                 Key: BEAM-8121
>                 URL: https://issues.apache.org/jira/browse/BEAM-8121
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.14.0
>            Reporter: TJ
>            Priority: Major
>
> Messages are consumed from Kafka using KafkaIO. Each kafka topic contains only 1 partition. (That means that messages can be consumed only by one Consumer per 1 consumer group)
> When backlog of topic grows and system scales from 1 to X machines, all the messages seems to be executed on  the same machine on which they are read. Due to that message throughput doesn't increase comparing X machines to 1 machine. If one machine was reading 2K messages  per s, X machines will be reading the same amount.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)