You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2020/07/04 17:55:06 UTC

[kafka] branch trunk updated: MINOR: Document that max.block.ms affects some transaction methods (#8975)

This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce939e9  MINOR: Document that max.block.ms affects some transaction methods (#8975)
ce939e9 is described below

commit ce939e9136b7a1ea5343d464b7660f905212c053
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Sat Jul 4 18:54:11 2020 +0100

    MINOR: Document that max.block.ms affects some transaction methods (#8975)
    
    The documentation for max.block.ms said it affected only send()
    and partitionsFor(), but it actually also affects initTransactions(),
    abortTransaction() and commitTransaction(). So rework the
    documentation to cover these methods too.
    
    Reviewers: Boyang Chen <bo...@confluent.io>
---
 .../org/apache/kafka/clients/producer/ProducerConfig.java     | 11 ++++++++---
 .../kafka/clients/producer/internals/RecordAccumulator.java   |  2 +-
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index dd3c7f7..e4aee39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -156,9 +156,14 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>max.block.ms</code> */
     public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
-    private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."
-                                                    + "These methods can be blocked either because the buffer is full or metadata unavailable."
-                                                    + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
+    private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long the <code>KafkaProducer</code>'s <code>send()</code>, <code>partitionsFor()</code>, "
+                                                    + "<code>initTransactions()</code>, <code>commitTransaction()</code> "
+                                                    + "and <code>abortTransaction()</code> methods will block. "
+                                                    + "For <code>send()</code> this timeout bounds the total time waiting for both metadata fetch and buffer allocation "
+                                                    + "(blocking in the user-supplied serializers or partitioner is not counted against this timeout). "
+                                                    + "For <code>partitionsFor()</code> this timeout bounds the time spent waiting for metadata if it is unavailable. "
+                                                    + "The transaction-related methods always block, but may timeout if "
+                                                    + "the transaction coordinator could not be discovered or did not respond within the timeout.";
 
     /** <code>buffer.memory</code> */
     public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 68add17..3781297 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -210,7 +210,7 @@ public final class RecordAccumulator {
 
             byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
             int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
-            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
+            log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
             buffer = free.allocate(size, maxTimeToBlock);
 
             // Update the current time in case the buffer allocation blocked above.