You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Andres Gomez Ferrer (JIRA)" <ji...@apache.org> on 2017/01/25 16:32:26 UTC

[jira] [Comment Edited] (KAFKA-4474) Poor kafka-streams throughput

    [ https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838055#comment-15838055 ] 

Andres Gomez Ferrer edited comment on KAFKA-4474 at 1/25/17 4:31 PM:
---------------------------------------------------------------------

Hi  all,

I investigated more on this issue and I think that I have found the reason of the poor throughput.  I am trying to explain it:

I attach two pictures that I'm going to use to explain it:

1. Example architecture.
!kafka-streams-bug-1.png!

2. General StreamThread life cycle.
!kafka-streams-bug-2.png!

The example architecture we have a kafka streams app that has a 1 stream thread and it consumes from a kafka topic with two partitions (picture 1). Also, we have a producer client that is sending data only to partition number 0, so the partition number 1 is empty.

When we start the kafka streams app, it subscribes to the input topic and starts to consume partitions 0-1 and it starts its life cycle (picture 2). The KS app consumes data from kafka, puts the data into stream task's buffers. Later, StreamThread executes the method process of the stream tasks. If the buffer is full the StreamTask decides that it doesn't need the requiresPoll, so on the next iteration it only process the messages that are inside the stream task's buffer and the consumer.poll(...) isn't called by the StreamThread.

The problem here is that the partition number 1 never has messages so .. the StreamTask_1 decides that it needs a new poll, so the StreamThreads is continuously calling consumer.poll(...) for each iteration. StreamTask_0 process one record and the StreamTask_1 forces that consumer.poll(...) call. 

One consumer.poll(...) for each processed record is a lot of calls and this produces the poor throughput.  (~4k records/sec on my tests). If I change the producer client to other that sends to the both partitions. This casuistry disappears and I got ~30k records/sec on my output topic but now the system has 1 CPU to 100% and I think that this is a reasonable result. 

Note: The life cycle of the StreamThread is more or less the software logic that you can find on runLoop() method.

{code:title=StreamThreads.java  runLoop()|borderStyle=solid}
    private void runLoop() {
        int totalNumBuffered = 0;
        boolean requiresPoll = true;
        boolean polledRecords = false;

        if (topicPattern != null) {
            consumer.subscribe(topicPattern, rebalanceListener);
        } else {
            consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
        }

        while (stillRunning()) {
            this.timerStartedMs = time.milliseconds();

            // try to fetch some records if necessary
            if (requiresPoll) {
                requiresPoll = false;

                boolean longPoll = totalNumBuffered == 0;

                ConsumerRecords<byte[], byte[]> records = consumer.poll(longPoll ? this.pollTimeMs : 0);

                if (rebalanceException != null)
                    throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);

                if (!records.isEmpty()) {
                    for (TopicPartition partition : records.partitions()) {
                        StreamTask task = activeTasksByPartition.get(partition);
                        task.addRecords(partition, records.records(partition));
                    }
                    polledRecords = true;
                } else {
                    polledRecords = false;
                }

                // only record poll latency is long poll is required
                if (longPoll) {
                    sensors.pollTimeSensor.record(computeLatency());
                }
            }

            // try to process one fetch record from each task via the topology, and also trigger punctuate
            // functions if necessary, which may result in more records going through the topology in this loop
            if (totalNumBuffered > 0 || polledRecords) {
                totalNumBuffered = 0;

                if (!activeTasks.isEmpty()) {
                    for (StreamTask task : activeTasks.values()) {

                        totalNumBuffered += task.process();

                        requiresPoll = requiresPoll || task.requiresPoll();

                        sensors.processTimeSensor.record(computeLatency());

                        maybePunctuate(task);

                        if (task.commitNeeded())
                            commitOne(task);
                    }

                } else {
                    // even when no task is assigned, we must poll to get a task.
                    requiresPoll = true;
                }

            } else {
                requiresPoll = true;
            }
            maybeCommit();
            maybeUpdateStandbyTasks();

            maybeClean();
        }
    }
{code}


was (Author: agomez):
Hi  all,

I investigated more on this issue and I think that I have found the reason of the poor throughput.  I am trying to explain it:

I attach two pictures that I'm going to use to explain it:

1. Example architecture.
!kafka-streams-bug-1.png!

2. General StreamThread life cycle.
!kafka-streams-bug-2.png!

The example architecture we have a kafka streams app that has a 1 stream thread and it consumes from a kafka topic with two partitions (picture 1). Also, we have a producer client that is sending data only to partition number 0, so the partition number 1 is empty.

When we start the kafka streams app, it subscribes to the input topic and starts to consume partitions 0-1 and it starts its life cycle (picture 2). The KS app consumes data from kafka, puts the data into stream task's buffers. Later, StreamThread executes the method process of the stream tasks. If the buffer is full the StreamTask decides that it doesn't need the requiresPoll, so on the next iteration it only process the messages that are inside the stream task's buffer and the consumer.poll(...) isn't called by the StreamThread.

The problem here is that the partition number 1 never has messages so .. the StreamTask_1 decides that it needs a new poll, so the StreamThreads is continuously calling consumer.poll(...) for each iteration. StreamTask_0 process one record and the StreamTask_1 forces that consumer.poll(...) call. 

One consumer.poll(...) for each processed record is a lot of calls and this produces the poor throughput.  (~4k records/sec on my tests). If I change the producer client to other that sends to the both partitions. This casuistry disappears and I got ~30 records/sec on my output topic but now the system has 1 CPU to 100% and I think that this is a reasonable result. 

Note: The life cycle of the StreamThread is more or less the software logic that you can find on runLoop() method.

{code:title=StreamThreads.java  runLoop()|borderStyle=solid}
    private void runLoop() {
        int totalNumBuffered = 0;
        boolean requiresPoll = true;
        boolean polledRecords = false;

        if (topicPattern != null) {
            consumer.subscribe(topicPattern, rebalanceListener);
        } else {
            consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
        }

        while (stillRunning()) {
            this.timerStartedMs = time.milliseconds();

            // try to fetch some records if necessary
            if (requiresPoll) {
                requiresPoll = false;

                boolean longPoll = totalNumBuffered == 0;

                ConsumerRecords<byte[], byte[]> records = consumer.poll(longPoll ? this.pollTimeMs : 0);

                if (rebalanceException != null)
                    throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);

                if (!records.isEmpty()) {
                    for (TopicPartition partition : records.partitions()) {
                        StreamTask task = activeTasksByPartition.get(partition);
                        task.addRecords(partition, records.records(partition));
                    }
                    polledRecords = true;
                } else {
                    polledRecords = false;
                }

                // only record poll latency is long poll is required
                if (longPoll) {
                    sensors.pollTimeSensor.record(computeLatency());
                }
            }

            // try to process one fetch record from each task via the topology, and also trigger punctuate
            // functions if necessary, which may result in more records going through the topology in this loop
            if (totalNumBuffered > 0 || polledRecords) {
                totalNumBuffered = 0;

                if (!activeTasks.isEmpty()) {
                    for (StreamTask task : activeTasks.values()) {

                        totalNumBuffered += task.process();

                        requiresPoll = requiresPoll || task.requiresPoll();

                        sensors.processTimeSensor.record(computeLatency());

                        maybePunctuate(task);

                        if (task.commitNeeded())
                            commitOne(task);
                    }

                } else {
                    // even when no task is assigned, we must poll to get a task.
                    requiresPoll = true;
                }

            } else {
                requiresPoll = true;
            }
            maybeCommit();
            maybeUpdateStandbyTasks();

            maybeClean();
        }
    }
{code}

> Poor kafka-streams throughput
> -----------------------------
>
>                 Key: KAFKA-4474
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4474
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Juan Chorro
>            Assignee: Eno Thereska
>         Attachments: hctop sreenshot.png, kafka-streams-bug-1.png, kafka-streams-bug-2.png, Performance test results.png
>
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' topic, prints on the screen and produces in 'output' topic. All topics have 4 partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' topic I detect that I'm receiving ~4K messages/second. I had next configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each instance (And more or less same throughput that with num.stream.threads set to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better that 1 instance with num.stream.threads set to 4. Is corrects this supposition?
> This is the kafka-streams application that I use: https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)