You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/27 16:21:22 UTC

[kafka] branch trunk updated: KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9eb32ea  KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563)
9eb32ea is described below

commit 9eb32eaad5fa3863d60b84ef095fa041e83b7b47
Author: huxi <hu...@hotmail.com>
AuthorDate: Wed Mar 28 00:21:18 2018 +0800

    KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563)
    
    Currently the `initTransactions()` API blocks indefinitely if the broker cannot be reached. This patch changes the behavior to raise a `TimeoutException` after waiting for `max.block.ms`.
    
    Reviewers: Apurva Mehta <ap...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/producer/KafkaProducer.java      | 25 +++++++--
 .../kafka/clients/producer/internals/Sender.java   |  2 +-
 .../internals/TransactionalRequestResult.java      |  5 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 61 ++++++++++++++++++++++
 .../integration/kafka/api/TransactionsTest.scala   | 17 +++++-
 5 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 5fc9a1b..a5af5b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -256,6 +256,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
     private final TransactionManager transactionManager;
+    private TransactionalRequestResult initTransactionsResult;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -555,18 +556,36 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *   2. Gets the internal producer id and epoch, used in all future transactional
      *      messages issued by the producer.
      *
+     * Note that this method will raise {@link TimeoutException} if the transactional state cannot
+     * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException}
+     * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully
+     * initialized, this method should no longer be used.
+     *
      * @throws IllegalStateException if no transactional.id has been configured
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
      * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
+     * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
+     * @throws InterruptException if the thread is interrupted while blocked
      */
     public void initTransactions() {
         throwIfNoTransactionManager();
-        TransactionalRequestResult result = transactionManager.initializeTransactions();
-        sender.wakeup();
-        result.await();
+        if (initTransactionsResult == null) {
+            initTransactionsResult = transactionManager.initializeTransactions();
+            sender.wakeup();
+        }
+
+        try {
+            if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) {
+                initTransactionsResult = null;
+            } else {
+                throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms.");
+            }
+        } catch (InterruptedException e) {
+            throw new InterruptException("Initialize transactions interrupted.", e);
+        }
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7eea499..426b273 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -329,7 +329,7 @@ public class Sender implements Runnable {
             return false;
 
         AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
-        while (true) {
+        while (running) {
             Node targetNode = null;
             try {
                 if (nextRequestHandler.needsCoordinator()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index ff93da8..9c02e94 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -59,7 +59,10 @@ public final class TransactionalRequestResult {
     }
 
     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
-        return latch.await(timeout, unit);
+        boolean success = latch.await(timeout, unit);
+        if (!isSuccessful())
+            throw error();
+        return success;
     }
 
     public RuntimeException error() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 9f70fd7..8bfc5e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -548,4 +548,65 @@ public class KafkaProducerTest {
             // expected
         }
     }
+
+    @Test(expected = TimeoutException.class)
+    public void testInitTransactionTimeout() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster("topic", 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(
+                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, client);
+        try {
+            producer.initTransactions();
+            fail("initTransactions() should have raised TimeoutException");
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster("topic", 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(
+                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, client);
+        try {
+            producer.initTransactions();
+        } catch (TimeoutException e) {
+            // expected
+        }
+        // other transactional operations should not be allowed if we catch the error after initTransactions failed
+        try {
+            producer.beginTransaction();
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
+        }
+    }
 }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 911808a..8435e5a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 
 import java.lang.{Long => JLong}
 import java.util.Properties
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
@@ -27,7 +27,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.ProducerFencedException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
@@ -532,6 +532,19 @@ class TransactionsTest extends KafkaServerTestHarness {
     }
   }
 
+  @Test(expected = classOf[KafkaException])
+  def testConsecutivelyRunInitTransactions(): Unit = {
+    val producer = createTransactionalProducer(transactionalId = "normalProducer")
+
+    try {
+      producer.initTransactions()
+      producer.initTransactions()
+      fail("Should have raised a KafkaException")
+    } finally {
+      producer.close()
+    }
+  }
+
   private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
                                                       start: Int, end: Int, willBeCommitted: Boolean): Unit = {
     for (i <- start until end) {

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.