You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2020/07/29 22:39:13 UTC
[kafka] branch trunk updated: KAFKA-10309: KafkaProducer's
sendOffsetsToTransaction should not block infinitively (#9081)
This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 783a645 KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively (#9081)
783a645 is described below
commit 783a6451f5f8c50dbe151caf5e76b74917690364
Author: Sasaki Toru <sa...@users.noreply.github.com>
AuthorDate: Thu Jul 30 07:38:27 2020 +0900
KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively (#9081)
Modified KafkaProducer.sendOffsetsToTransaction() to be affected with max.block.ms, and added timeout test for blocking methods
Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Xi Hu <hu...@hotmail.com>
---
.../kafka/clients/producer/KafkaProducer.java | 6 ++-
.../kafka/clients/producer/ProducerConfig.java | 2 +-
.../integration/kafka/api/TransactionsTest.scala | 59 +++++++++++++++-------
3 files changed, 48 insertions(+), 19 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3d86de2..13af2f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -662,6 +662,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* Note, that the consumer should have {@code enable.auto.commit=false} and should
* also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
+ * This method will raise {@link TimeoutException} if the producer cannot send offsets before expiration of {@code max.block.ms}.
+ * Additionally, it will raise {@link InterruptException} if interrupted.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started.
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
@@ -679,6 +681,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* mis-configured consumer instance id within group metadata.
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
+ * @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms.
+ * @throws InterruptException if the thread is interrupted while blocked
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
@@ -687,7 +691,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throwIfProducerClosed();
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await();
+ result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 2e9fa24..64d9de3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -157,7 +157,7 @@ public class ProducerConfig extends AbstractConfig {
/** <code>max.block.ms</code> */
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long the <code>KafkaProducer</code>'s <code>send()</code>, <code>partitionsFor()</code>, "
- + "<code>initTransactions()</code>, <code>commitTransaction()</code> "
+ + "<code>initTransactions()</code>, <code>sendOffsetsToTransaction()</code>, <code>commitTransaction()</code> "
+ "and <code>abortTransaction()</code> methods will block. "
+ "For <code>send()</code> this timeout bounds the total time waiting for both metadata fetch and buffer allocation "
+ "(blocking in the user-supplied serializers or partitioner is not counted against this timeout). "
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 13fafc4..e3825a1 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -406,6 +406,48 @@ class TransactionsTest extends KafkaServerTestHarness {
TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset")
}
+ @Test(expected = classOf[TimeoutException])
+ def testInitTransactionsTimeout(): Unit = {
+ testTimeout(false, producer => producer.initTransactions())
+ }
+
+ @Test(expected = classOf[TimeoutException])
+ def testSendOffsetsToTransactionTimeout(): Unit = {
+ testTimeout(true, producer => producer.sendOffsetsToTransaction(
+ Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava, "test-group"))
+ }
+
+ @Test(expected = classOf[TimeoutException])
+ def testCommitTransactionTimeout(): Unit = {
+ testTimeout(true, producer => producer.commitTransaction())
+ }
+
+ @Test(expected = classOf[TimeoutException])
+ def testAbortTransactionTimeout(): Unit = {
+ testTimeout(true, producer => producer.abortTransaction())
+ }
+
+ def testTimeout(needInitAndSendMsg: Boolean,
+ timeoutProcess: KafkaProducer[Array[Byte], Array[Byte]] => Unit): Unit = {
+ val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000)
+
+ if (needInitAndSendMsg) {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes))
+ }
+
+ for (i <- servers.indices)
+ killBroker(i)
+
+ try {
+ timeoutProcess(producer)
+ fail("Should raise a TimeoutException")
+ } finally {
+ producer.close(Duration.ZERO)
+ }
+ }
+
@Test
def testFencingOnSend(): Unit = {
val producer1 = transactionalProducers(0)
@@ -586,23 +628,6 @@ class TransactionsTest extends KafkaServerTestHarness {
fail("Should have raised a KafkaException")
}
- @Test(expected = classOf[TimeoutException])
- def testCommitTransactionTimeout(): Unit = {
- val producer = createTransactionalProducer("transactionalProducer", maxBlockMs = 1000)
- producer.initTransactions()
- producer.beginTransaction()
- producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foobar".getBytes))
-
- for (i <- 0 until servers.size)
- killBroker(i) // pretend all brokers not available
-
- try {
- producer.commitTransaction()
- } finally {
- producer.close(Duration.ZERO)
- }
- }
-
@Test
def testBumpTransactionalEpoch(): Unit = {
val producer = createTransactionalProducer("transactionalProducer",