You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/29 20:56:19 UTC

[GitHub] [kafka] artemlivshits opened a new pull request, #12365: KAFKA-14020: Performance regression in Producer

artemlivshits opened a new pull request, #12365:
URL: https://github.com/apache/kafka/pull/12365

   As part of KAFKA-10888 work a call to time.milliseconds() got moved
   under the queue lock, moving it back outside the lock.  The call may be
   expensive and cause lock contention.
   
   Tested via manually crafted benchmark, alloc profile shows ~15% lock contention on the ArrayQueue lock without the fix and ~5% lock contention with the fix (which is also consistent with pre-KAFKA-10888 profile).
   
   Will add a proper jmh benchmark for producer (looks like we don't have one) in a follow-up change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924136058


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   Ok, so what's the suggestion in this change?  Should I leave the code as is or remove assert?  Creating a new utility seems to be out of scope for this change.  We could have an offline discussion about asserts, I would be happy to see them used more often in Kafka.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   It's missing the partition info. Previously, partition was calculated before doing RecordAccumulator.append so we could do tracing in the doSend, but now the partition may be calculated at the beginning of RecordAccumulator.append, so tracing needs to happen after it's known, but before the actual append proceeds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1186005602

   @jsancio : We plan to cherry-pick this to 3.3 branch since this fixes a performance issue in [KAFKA-10888](https://issues.apache.org/jira/browse/KAFKA-10888).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1185997352

   @junrao, @ijuma thank you for reviews, I've updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924779033


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,21 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
-        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private final String topic;
+        private final Integer recordPartition;
+        private final String recordLogString;
+        private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private volatile TopicPartition topicPartition;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.record = record;
+            // Extract record info as we don't want to keep a reference to the record during
+            // whole lifetime of the batch.

Review Comment:
   Could we move these two lines to the immediate line before where we set recordPartition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r919747090


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;
         }
 
         public int getPartition() {
             return partition;
         }
 
         public TopicPartition topicPartition() {
-            if (record == null)
-                return null;
-            return partition == RecordMetadata.UNKNOWN_PARTITION
-                    ? ProducerInterceptors.extractTopicPartition(record)
-                    : new TopicPartition(record.topic(), partition);
+            if (partition != RecordMetadata.UNKNOWN_PARTITION)
+                return new TopicPartition(topic, partition);

Review Comment:
   It's a bit surprising to allocate every time a method like this is called. Can we not allocate the topic partition once and reuse it?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   It's a bit surprising that a method called setPartition resets the record. Maybe we can make the method name clearer. It would also be useful for the comment to state why we no longer need the record after this.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   A couple of lines above we use a language level assert. In Kafka, we typically use asset like methods like the Objects class since the language level asserts are disabled by default.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                     log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation blocked above.
+                    // NOTE: getting time may be expensive, so calling it under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   Did we reach a conclusion regarding this?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   Can you elaborate on this? What kind of application bug would surface itself in a silent way like this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r922530070


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1041 has a test, which effectively codifies the contract.  I would agree that it's weird to have contract about null handling, but at this point I'd rather preserve whatever behavior is codified.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,15 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;

Review Comment:
   Will do.  And partition then should be volatile as well, then.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   The intention of an assert is to run in tests, but be disabled in prod, so if my understanding is correct, this is the proper usage.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                     log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation blocked above.
+                    // NOTE: getting time may be expensive, so calling it under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   I synced up with Jun offline, in this change it makes sense to preserve the current behavior (too close to release).



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;
         }
 
         public int getPartition() {
             return partition;
         }
 
         public TopicPartition topicPartition() {
-            if (record == null)
-                return null;
-            return partition == RecordMetadata.UNKNOWN_PARTITION
-                    ? ProducerInterceptors.extractTopicPartition(record)
-                    : new TopicPartition(record.topic(), partition);
+            if (partition != RecordMetadata.UNKNOWN_PARTITION)
+                return new TopicPartition(topic, partition);

Review Comment:
   The topicPartition is called once in success case (maybe twice in error case).  I'll add a comment.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   The method overrides a callback that is called setPartition to reflect what the caller does with it (it sets partition).  I agree that it's a little non-intuitive to do a state transition here but there doesn't seem to be a better place to do it if we want to preserve the behavior -- we need record until here to do the tracing and we cannot do tracing earlier because we may not know the partition; at the same time, we don't want to keep it longer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r922782322


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                     log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation blocked above.
+                    // NOTE: getting time may be expensive, so calling it under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   Do we want to file a JIRA for changing this for the next release?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924191711


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   I looked at the test and it seems to check that an exception is thrown? As @junrao said, this can be done by validating what `send` receives instead of polluting the whole codebase. I'm OK if we file a JIRA for that and do it as a separate PR. But we should remove this code when we do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1190432816

   cherry-picked the PR to 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r923725627


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;
         }
 
         public int getPartition() {
             return partition;
         }
 
         public TopicPartition topicPartition() {
-            if (record == null)
-                return null;
-            return partition == RecordMetadata.UNKNOWN_PARTITION
-                    ? ProducerInterceptors.extractTopicPartition(record)
-                    : new TopicPartition(record.topic(), partition);
+            if (partition != RecordMetadata.UNKNOWN_PARTITION)
+                return new TopicPartition(topic, partition);

Review Comment:
   Ok.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                     log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation blocked above.
+                    // NOTE: getting time may be expensive, so calling it under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   Filed KAFKA-14083.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   I think it would be non-intuitive to control record lifetime from the RecordAccumulator.append (that calls the callback) -- here we know that we don't need the record once partition is set, but the RecordAccumulator.append doesn't know it (in fact, it doesn't even know what we have the record).  But I can add change it if you think this would make it easier to understand.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   Would passing null record not be a bug?  I've changed the comment to not mention that it would be a bug.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   All my previous life I was using asserts extensively in C/C++, they provide both validation and contract documentation.  They do redundant validation in builds that are used in system tests without adding perf cost in prod.  I can remove it, if it's not compatible with style, though I don't think this is just style -- using asserts makes a material difference in early bug detection and in code comprehension.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189657885

   @artemlivshits : Are the test failures related to the PR?
   
   @ijuma : Do you have any other comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189943978

   @junrao the updated version looks good to me. Thanks @artemlivshits for the patience and iterations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924715457


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   It checks that the exception is thrown and then it checks that interceptors are called.  Probably the test is just sloppy and could use a different error condition. KAFKA-14086



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   Updated to extract all record info in the constructor.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   KAFKA-14085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189712085

   > Are the test failures related to the PR?
   
   Yes, just pushed the fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924667928


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   Since the only info we need from record is record.partition(), could we keep record.partition() in the instance instead of the whole record? Since record.partition() is much smaller, maybe there is no need to nullify it in setPartition()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r922781755


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   That's not considered best practice in Java and we don't do it in Kafka. What would be the reason to run it in tests only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r922781914


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   I don't think this answered my question. What application bug would result in this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r912836877


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                     log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation blocked above.
+                    // NOTE: getting time may be expensive, so calling it under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   This change has some other side-effects to the code logic:
   1. In case of retries (multiple iteration of while loop) when buffer allocation blocks, prior to this change `tryAppend` on line 283 was being called with older `nowMs`. After this change it's a more recent time. This is a positive change.
   
   2. In case of retries, when buffer allocation does not occur, prior to this change the time was computed inside `appendNewBatch` hence, was guaranteed to the latest. After this change, there might be threads blocked on `synchronized` or the time consumed by previous retry isn't factored in the `nowMs` being passed to appendNewBatch. Hence, `nowMs` being passed to `appendNewBatch` might be stale by some amount (depending on how long threads were waiting to acquire the block). Is that acceptable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1170664838

   Haven't checked the streams benchmark yet.  But it is a regression that is visible in the lock profile, so from that perspective seems to be a net positive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1170646389

   @artemlivshits : Thanks for the PR. Does this resolve all the perf regression reported in https://github.com/apache/kafka/pull/12342 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r922782270


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;
         }
 
         public int getPartition() {
             return partition;
         }
 
         public TopicPartition topicPartition() {
-            if (record == null)
-                return null;
-            return partition == RecordMetadata.UNKNOWN_PARTITION
-                    ? ProducerInterceptors.extractTopicPartition(record)
-                    : new TopicPartition(record.topic(), partition);
+            if (partition != RecordMetadata.UNKNOWN_PARTITION)
+                return new TopicPartition(topic, partition);

Review Comment:
   Even so, the way this method is used can change over time. And then you end up with a lot of unexpected allocation. The way I suggested is more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r919749970


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   A couple of lines above we use a language level assert. In Kafka, we typically use assert like methods like the Objects class since the language level asserts are disabled by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924191711


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   I looked at the test and it seems to check that an exception is thrown? As @junrao said, this can be done by validating what `send` receives instead of polluting the whole codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r923872451


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   The point is that you can run these checks in prod without measurable cost. Then why limit it to tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1192047420

   Hi @etolbakov, making this method private sounds reasonable to me.  Thank you for suggestion.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
jsancio commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189361061

   > @jsancio : We plan to cherry-pick this to 3.3 branch since this fixes a performance issue in [KAFKA-10888](https://issues.apache.org/jira/browse/KAFKA-10888).
   
   Sounds good @junrao . I set the fix version for KAFKA-14020 to 3.3.0.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924801253


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,21 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
-        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private final String topic;
+        private final Integer recordPartition;
+        private final String recordLogString;
+        private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private volatile TopicPartition topicPartition;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.record = record;
+            // Extract record info as we don't want to keep a reference to the record during
+            // whole lifetime of the batch.

Review Comment:
   I think it applies to all 3 fields: topic, recordPartition and recordLogString - we extract all this info from the record, so the comment is before we do that (in the PR it's kind of hard to see because of the inline discussion).  Let me know if you think otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao merged pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao merged PR #12365:
URL: https://github.com/apache/kafka/pull/12365


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r923872243


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   It seems we have been mostly using Objects.requireNonNull for null assertion in our code. It doesn't seem to add too much overhead and helps identify issues in production early on. For consistency, perhaps we could use Objects.requireNonNull instead of assert.
   
   @ijuma : What do you recommend that we use for assertions like `assert partitionInfo == stickyPartitionInfo.get()`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   If a user passes in a null record in send(), we will be throwing a NullPointException somewhere. So, we probably could just throw an exception early in that case without going through the callback and fix the test accordingly. We probably could do that in a separate PR in trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r923946004


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   Thanks for the explanation. Is there a possibility the trace could be done in the caller context? Or is that missing some of the required information?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924818200


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,21 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
-        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private final String topic;
+        private final Integer recordPartition;
+        private final String recordLogString;
+        private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private volatile TopicPartition topicPartition;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.record = record;
+            // Extract record info as we don't want to keep a reference to the record during
+            // whole lifetime of the batch.

Review Comment:
   Thanks for the explanation. Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r922782058


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   We can add another callback method since this is an internal interface, right? This kind of thing leads to a lot of maintenance pain down the line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r922781755


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   That's not considered best practice in Java and we don't typically do it in Kafka. What would be the reason to run it in tests only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r919479183


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,15 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;

Review Comment:
   Should we make `record` volatile since it's being updated and read by different threads?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
vvcephei commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1182501381

   Hey @artemlivshits , I just re-ran the Streams benchmarks that originally found the regression. It looks like it's resolved, as of your latest commit!
   
   As a reminder, this was the baseline for "good" performance:
   Commit: https://github.com/apache/kafka/commit/e3202b99999ef4c63aab2e5ab049978704282792 (the parent of the problematic commit)
   TPut: **118k±1k**
   
   And when I ran the same benchmark on https://github.com/apache/kafka/pull/12365/commits/3a6500bb12b8c5716f7d99b6cec1c521f6f029c2 , I got:
   TPut: **117k±1k**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924206144


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition before that.
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive as long as the batch is.
+            record = null;

Review Comment:
   I find that the code complexity to achieve this trace logging is a bit high. I have some ideas on how to improve it, but we can leave that for latter. A simple suggestion for now would be to change `setPartition` to `onPartitionAssigned` or something like that. This would indicate a general callback that can do anything once the partition is known.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189805608

   Looked at the failed tests, seem unrelated and pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924189300


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   For the line in this method, you could do something like:
   
   ```java
   if (partition < 0)
     throw new IllegalArgumentException("partition should be positive, but it was " + partition):
   ```
   
   Which is more informative and idiomatic and checks the more general case that we expect partitions to be positive. But I see that we have sprinkled the same check in other methods. So, having a `assertPartitionIsPositive` would probably be a better approach. In any case, since this code was introduced in a different change, we can file a JIRA and do it as a separate PR.
   
   I am happy to discuss more, but we should be clear about terminology. Language level asserts in Java aren't used much. Checking preconditions through API boundaries is useful. Within a given boundary, it's best to use the type system to avoid having noise all over the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r923943042


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   Correct @junrao, `Objects.requireNonNull` would be the recommended way to assert non null. The reference equality check is less common, we could add our own utility method in Utils for that or inline it.
   
   The main thing is to get the appropriate signal if this happens in prod when the cost is low (both examples would be in that category).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r914251606


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                     log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation blocked above.
+                    // NOTE: getting time may be expensive, so calling it under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   @divijvaidya : For 2, before [KAFKA-10888](https://issues.apache.org/jira/browse/KAFKA-10888), nowMs is also computed before synchronized. So, it has the same behavior as this PR.
   
   Looking at the code, I am not sure if nowMs is strictly needed. nowMs is used to populate ProducerBatch.lastAppendTime. However, since KAFKA-5886, expiration is based on createTime and not on lastAppendTime. lastAppendTime is only used to upper bound lastAttemptMs. This may not be needed. @hachikuji : Could we just get right of ProducerBatch.lastAppendTime?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] etolbakov commented on pull request #12365: KAFKA-14020: Performance regression in Producer

Posted by GitBox <gi...@apache.org>.
etolbakov commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1191436621

   Hello Artem @artemlivshits,
   
   I was studying the PR/`KAFKA-14020` ticket and decided to share a minor observation.
   It seems that `extractTopicPartition` method from `ProducerInterceptors` could be turned in the private (or even inlined).
   https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java#L125
   Happy to make a PR in case you see it reasonable.
   
   ----
   Regards, Eugene


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org