You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2020/07/04 17:55:06 UTC
[kafka] branch trunk updated: MINOR: Document that max.block.ms
affects some transaction methods (#8975)
This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce939e9 MINOR: Document that max.block.ms affects some transaction methods (#8975)
ce939e9 is described below
commit ce939e9136b7a1ea5343d464b7660f905212c053
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Sat Jul 4 18:54:11 2020 +0100
MINOR: Document that max.block.ms affects some transaction methods (#8975)
The documentation for max.block.ms said it affected only send()
and partitionsFor(), but it actually also affects initTransactions(),
abortTransaction() and commitTransaction(). So rework the
documentation to cover these methods too.
Reviewers: Boyang Chen <bo...@confluent.io>
---
.../org/apache/kafka/clients/producer/ProducerConfig.java | 11 ++++++++---
.../kafka/clients/producer/internals/RecordAccumulator.java | 2 +-
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index dd3c7f7..e4aee39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -156,9 +156,14 @@ public class ProducerConfig extends AbstractConfig {
/** <code>max.block.ms</code> */
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
- private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."
- + "These methods can be blocked either because the buffer is full or metadata unavailable."
- + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
+ private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long the <code>KafkaProducer</code>'s <code>send()</code>, <code>partitionsFor()</code>, "
+ + "<code>initTransactions()</code>, <code>commitTransaction()</code> "
+ + "and <code>abortTransaction()</code> methods will block. "
+ + "For <code>send()</code> this timeout bounds the total time waiting for both metadata fetch and buffer allocation "
+ + "(blocking in the user-supplied serializers or partitioner is not counted against this timeout). "
+ + "For <code>partitionsFor()</code> this timeout bounds the time spent waiting for metadata if it is unavailable. "
+ + "The transaction-related methods always block, but may timeout if "
+ + "the transaction coordinator could not be discovered or did not respond within the timeout.";
/** <code>buffer.memory</code> */
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 68add17..3781297 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -210,7 +210,7 @@ public final class RecordAccumulator {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
- log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
+ log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.