You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/07/20 15:19:47 UTC

[kafka] branch trunk updated: KAFKA-14020: Performance regression in Producer (#12365)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new badfbacdd0 KAFKA-14020: Performance regression in Producer (#12365)
badfbacdd0 is described below

commit badfbacdd09a9ee8821847f4b28d98625f354ed7
Author: Artem Livshits <84...@users.noreply.github.com>
AuthorDate: Wed Jul 20 08:19:31 2022 -0700

    KAFKA-14020: Performance regression in Producer (#12365)
    
    As part of KAFKA-10888 work, there were a couple regressions introduced:
    
    A call to time.milliseconds() got moved under the queue lock, moving it back outside the lock. The call may be expensive and cause lock contention. Now the call is moved back outside of the lock.
    
    The reference to ProducerRecord was held in the batch completion callback, so it was kept alive as long as the batch was alive, which may increase the amount of memory in certain scenario and cause excessive GC work. Now the reference is reset early, so the ProducerRecord lifetime isn't bound to the batch lifetime.
    
    Tested via manually crafted benchmark, lock profile shows ~15% lock contention on the ArrayQueue lock without the fix and ~5% lock contention with the fix (which is also consistent with pre-KAFKA-10888 profile).
    
    Alloc profile shows ~10% spent in ProducerBatch.completeFutureAndFireCallbacks without the fix vs. ~0.25% with the fix (which is also consistent with pre-KAFKA-10888 profile).
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      | 30 +++++++++++++++-------
 .../producer/internals/RecordAccumulator.java      | 13 +++++++---
 2 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 74d408d9a5..2d5c8994b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1465,13 +1465,21 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
-        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private final String topic;
+        private final Integer recordPartition;
+        private final String recordLogString;
+        private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private volatile TopicPartition topicPartition;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.record = record;
+            // Extract record info as we don't want to keep a reference to the record during
+            // whole lifetime of the batch.
+            // We don't want to have an NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;
+            recordPartition = record != null ? record.partition() : null;
+            recordLogString = log.isTraceEnabled() && record != null ? record.toString() : "";
         }
 
         @Override
@@ -1491,7 +1499,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", recordLogString, userCallback, topic, partition);
             }
         }
 
@@ -1500,11 +1508,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
 
         public TopicPartition topicPartition() {
-            if (record == null)
-                return null;
-            return partition == RecordMetadata.UNKNOWN_PARTITION
-                    ? ProducerInterceptors.extractTopicPartition(record)
-                    : new TopicPartition(record.topic(), partition);
+            if (topicPartition == null && topic != null) {
+                if (partition != RecordMetadata.UNKNOWN_PARTITION)
+                    topicPartition = new TopicPartition(topic, partition);
+                else if (recordPartition != null)
+                    topicPartition = new TopicPartition(topic, recordPartition);
+                else
+                    topicPartition = new TopicPartition(topic, RecordMetadata.UNKNOWN_PARTITION);
+            }
+            return topicPartition;
         }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 4168ea68aa..a1f684ac95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -297,7 +297,12 @@ public class RecordAccumulator {
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                     log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation blocked above.
+                    // NOTE: getting time may be expensive, so calling it under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();
                 }
 
                 synchronized (dq) {
@@ -307,7 +312,7 @@ public class RecordAccumulator {
                                 partitionInfo.partition(), topic);
                         continue;
                     }
-                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                     // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                     if (appendResult.newBatchCreated)
                         buffer = null;
@@ -333,6 +338,7 @@ public class RecordAccumulator {
      * @param headers the Headers for the record
      * @param callbacks The callbacks to execute
      * @param buffer The buffer for the new batch
+     * @param nowMs The current time, in milliseconds
      */
     private RecordAppendResult appendNewBatch(String topic,
                                               int partition,
@@ -342,11 +348,10 @@ public class RecordAccumulator {
                                               byte[] value,
                                               Header[] headers,
                                               AppendCallbacks callbacks,
-                                              ByteBuffer buffer) {
+                                              ByteBuffer buffer,
+                                              long nowMs) {
         assert partition != RecordMetadata.UNKNOWN_PARTITION;
 
-        // Update the current time in case the buffer allocation blocked above.
-        long nowMs = time.milliseconds();
         RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
         if (appendResult != null) {
             // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...