You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/14 04:35:43 UTC
[kafka] branch trunk updated: MINOR: Avoid unnecessary leaderFor
calls when ProducerBatch queue empty (#7196)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2ca3019 MINOR: Avoid unnecessary leaderFor calls when ProducerBatch queue empty (#7196)
2ca3019 is described below
commit 2ca30191d2bb430a4b27b125177406cf263d444f
Author: Lucas Bradstreet <lu...@gmail.com>
AuthorDate: Tue Aug 13 21:35:03 2019 -0700
MINOR: Avoid unnecessary leaderFor calls when ProducerBatch queue empty (#7196)
The RecordAccumulator ready calls `leaderFor` unnecessarily when the ProducerBatch
queue is empty. When producing to many partitions, the queue is often empty and the
`leaderFor` call can be expensive in comparison. Remove the unnecessary call.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../producer/internals/RecordAccumulator.java | 21 +++++++++++----------
1 file changed, 11 insertions(+), 10 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index fb335e4..745382d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -459,18 +459,19 @@ public final class RecordAccumulator {
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
- TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
-
- Node leader = cluster.leaderFor(part);
synchronized (deque) {
- if (leader == null && !deque.isEmpty()) {
- // This is a partition for which leader is not known, but messages are available to send.
- // Note that entries are currently not removed from batches when deque is empty.
- unknownLeaderTopics.add(part.topic());
- } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
- ProducerBatch batch = deque.peekFirst();
- if (batch != null) {
+ // When producing to a large number of partitions, this path is hot and deques are often empty.
+ // We check whether a batch exists first to avoid the more expensive checks whenever possible.
+ ProducerBatch batch = deque.peekFirst();
+ if (batch != null) {
+ TopicPartition part = entry.getKey();
+ Node leader = cluster.leaderFor(part);
+ if (leader == null) {
+ // This is a partition for which leader is not known, but messages are available to send.
+ // Note that entries are currently not removed from batches when deque is empty.
+ unknownLeaderTopics.add(part.topic());
+ } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;