You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/14 04:35:43 UTC

[kafka] branch trunk updated: MINOR: Avoid unnecessary leaderFor calls when ProducerBatch queue empty (#7196)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2ca3019  MINOR: Avoid unnecessary leaderFor calls when ProducerBatch queue empty (#7196)
2ca3019 is described below

commit 2ca30191d2bb430a4b27b125177406cf263d444f
Author: Lucas Bradstreet <lu...@gmail.com>
AuthorDate: Tue Aug 13 21:35:03 2019 -0700

    MINOR: Avoid unnecessary leaderFor calls when ProducerBatch queue empty (#7196)
    
    The RecordAccumulator ready calls `leaderFor` unnecessarily when the ProducerBatch
    queue is empty. When producing to many partitions, the queue is often empty and the
    `leaderFor` call can be expensive in comparison. Remove the unnecessary call.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../producer/internals/RecordAccumulator.java       | 21 +++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index fb335e4..745382d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -459,18 +459,19 @@ public final class RecordAccumulator {
 
         boolean exhausted = this.free.queued() > 0;
         for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
-            TopicPartition part = entry.getKey();
             Deque<ProducerBatch> deque = entry.getValue();
-
-            Node leader = cluster.leaderFor(part);
             synchronized (deque) {
-                if (leader == null && !deque.isEmpty()) {
-                    // This is a partition for which leader is not known, but messages are available to send.
-                    // Note that entries are currently not removed from batches when deque is empty.
-                    unknownLeaderTopics.add(part.topic());
-                } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
-                    ProducerBatch batch = deque.peekFirst();
-                    if (batch != null) {
+                // When producing to a large number of partitions, this path is hot and deques are often empty.
+                // We check whether a batch exists first to avoid the more expensive checks whenever possible.
+                ProducerBatch batch = deque.peekFirst();
+                if (batch != null) {
+                    TopicPartition part = entry.getKey();
+                    Node leader = cluster.leaderFor(part);
+                    if (leader == null) {
+                        // This is a partition for which leader is not known, but messages are available to send.
+                        // Note that entries are currently not removed from batches when deque is empty.
+                        unknownLeaderTopics.add(part.topic());
+                    } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                         long waitedTimeMs = batch.waitedTimeMs(nowMs);
                         boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                         long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;