You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2020/07/29 22:39:13 UTC

[kafka] branch trunk updated: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively (#9081)

This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 783a645  KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively (#9081)
783a645 is described below

commit 783a6451f5f8c50dbe151caf5e76b74917690364
Author: Sasaki Toru <sa...@users.noreply.github.com>
AuthorDate: Thu Jul 30 07:38:27 2020 +0900

    KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively (#9081)
    
    Modified KafkaProducer.sendOffsetsToTransaction() to be affected with max.block.ms, and added timeout test for blocking methods
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Xi Hu <hu...@hotmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |  6 ++-
 .../kafka/clients/producer/ProducerConfig.java     |  2 +-
 .../integration/kafka/api/TransactionsTest.scala   | 59 +++++++++++++++-------
 3 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3d86de2..13af2f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -662,6 +662,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * Note, that the consumer should have {@code enable.auto.commit=false} and should
      * also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
      * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
+     * This method will raise {@link TimeoutException} if the producer cannot send offsets before expiration of {@code max.block.ms}.
+     * Additionally, it will raise {@link InterruptException} if interrupted.
      *
      * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started.
      * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
@@ -679,6 +681,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *                                                                  mis-configured consumer instance id within group metadata.
      * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
      *         other unexpected error
+     * @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms.
+     * @throws InterruptException if the thread is interrupted while blocked
      */
     public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                          ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
@@ -687,7 +691,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         throwIfProducerClosed();
         TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
         sender.wakeup();
-        result.await();
+        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 2e9fa24..64d9de3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -157,7 +157,7 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>max.block.ms</code> */
     public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
     private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long the <code>KafkaProducer</code>'s <code>send()</code>, <code>partitionsFor()</code>, "
-                                                    + "<code>initTransactions()</code>, <code>commitTransaction()</code> "
+                                                    + "<code>initTransactions()</code>, <code>sendOffsetsToTransaction()</code>, <code>commitTransaction()</code> "
                                                     + "and <code>abortTransaction()</code> methods will block. "
                                                     + "For <code>send()</code> this timeout bounds the total time waiting for both metadata fetch and buffer allocation "
                                                     + "(blocking in the user-supplied serializers or partitioner is not counted against this timeout). "
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 13fafc4..e3825a1 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -406,6 +406,48 @@ class TransactionsTest extends KafkaServerTestHarness {
     TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testInitTransactionsTimeout(): Unit = {
+    testTimeout(false, producer => producer.initTransactions())
+  }
+
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+    testTimeout(true, producer => producer.sendOffsetsToTransaction(
+      Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava, "test-group"))
+  }
+
+  @Test(expected = classOf[TimeoutException])
+  def testCommitTransactionTimeout(): Unit = {
+    testTimeout(true, producer => producer.commitTransaction())
+  }
+
+  @Test(expected = classOf[TimeoutException])
+  def testAbortTransactionTimeout(): Unit = {
+    testTimeout(true, producer => producer.abortTransaction())
+  }
+
+  def testTimeout(needInitAndSendMsg: Boolean,
+                  timeoutProcess: KafkaProducer[Array[Byte], Array[Byte]] => Unit): Unit = {
+    val producer = createTransactionalProducer("transactionProducer", maxBlockMs =  1000)
+
+    if (needInitAndSendMsg) {
+      producer.initTransactions()
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes))
+    }
+
+    for  (i <- servers.indices)
+      killBroker(i)
+
+    try {
+      timeoutProcess(producer)
+      fail("Should raise a TimeoutException")
+    } finally {
+      producer.close(Duration.ZERO)
+    }
+  }
+
   @Test
   def testFencingOnSend(): Unit = {
     val producer1 = transactionalProducers(0)
@@ -586,23 +628,6 @@ class TransactionsTest extends KafkaServerTestHarness {
     fail("Should have raised a KafkaException")
   }
 
-  @Test(expected = classOf[TimeoutException])
-  def testCommitTransactionTimeout(): Unit = {
-    val producer = createTransactionalProducer("transactionalProducer", maxBlockMs = 1000)
-    producer.initTransactions()
-    producer.beginTransaction()
-    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foobar".getBytes))
-
-    for (i <- 0 until servers.size)
-      killBroker(i) // pretend all brokers not available
-
-    try {
-      producer.commitTransaction()
-    } finally {
-      producer.close(Duration.ZERO)
-    }
-  }
-
   @Test
   def testBumpTransactionalEpoch(): Unit = {
     val producer = createTransactionalProducer("transactionalProducer",