You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/27 16:21:22 UTC
[kafka] branch trunk updated: KAFKA-6446;
KafkaProducer initTransactions() should timeout after max.block.ms
(#4563)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9eb32ea KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563)
9eb32ea is described below
commit 9eb32eaad5fa3863d60b84ef095fa041e83b7b47
Author: huxi <hu...@hotmail.com>
AuthorDate: Wed Mar 28 00:21:18 2018 +0800
KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563)
Currently the `initTransactions()` API blocks indefinitely if the broker cannot be reached. This patch changes the behavior to raise a `TimeoutException` after waiting for `max.block.ms`.
Reviewers: Apurva Mehta <ap...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/producer/KafkaProducer.java | 25 +++++++--
.../kafka/clients/producer/internals/Sender.java | 2 +-
.../internals/TransactionalRequestResult.java | 5 +-
.../kafka/clients/producer/KafkaProducerTest.java | 61 ++++++++++++++++++++++
.../integration/kafka/api/TransactionsTest.scala | 17 +++++-
5 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 5fc9a1b..a5af5b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -256,6 +256,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
+ private TransactionalRequestResult initTransactionsResult;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -555,18 +556,36 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* 2. Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
+ * Note that this method will raise {@link TimeoutException} if the transactional state cannot
+ * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException}
+ * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully
+ * initialized, this method should no longer be used.
+ *
* @throws IllegalStateException if no transactional.id has been configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
+ * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
+ * @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
throwIfNoTransactionManager();
- TransactionalRequestResult result = transactionManager.initializeTransactions();
- sender.wakeup();
- result.await();
+ if (initTransactionsResult == null) {
+ initTransactionsResult = transactionManager.initializeTransactions();
+ sender.wakeup();
+ }
+
+ try {
+ if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) {
+ initTransactionsResult = null;
+ } else {
+ throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms.");
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptException("Initialize transactions interrupted.", e);
+ }
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7eea499..426b273 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -329,7 +329,7 @@ public class Sender implements Runnable {
return false;
AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
- while (true) {
+ while (running) {
Node targetNode = null;
try {
if (nextRequestHandler.needsCoordinator()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index ff93da8..9c02e94 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -59,7 +59,10 @@ public final class TransactionalRequestResult {
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- return latch.await(timeout, unit);
+ boolean success = latch.await(timeout, unit);
+ if (!isSuccessful())
+ throw error();
+ return success;
}
public RuntimeException error() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 9f70fd7..8bfc5e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -548,4 +548,65 @@ public class KafkaProducerTest {
// expected
}
}
+
+ @Test(expected = TimeoutException.class)
+ public void testInitTransactionTimeout() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+ Time time = new MockTime();
+ Cluster cluster = TestUtils.singletonCluster("topic", 1);
+ Node node = cluster.nodes().get(0);
+
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+ metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+
+ Producer<String, String> producer = new KafkaProducer<>(
+ new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+ new StringSerializer(), new StringSerializer(), metadata, client);
+ try {
+ producer.initTransactions();
+ fail("initTransactions() should have raised TimeoutException");
+ } finally {
+ producer.close(0, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+ Time time = new MockTime();
+ Cluster cluster = TestUtils.singletonCluster("topic", 1);
+ Node node = cluster.nodes().get(0);
+
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+ metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+
+ Producer<String, String> producer = new KafkaProducer<>(
+ new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+ new StringSerializer(), new StringSerializer(), metadata, client);
+ try {
+ producer.initTransactions();
+ } catch (TimeoutException e) {
+ // expected
+ }
+ // other transactional operations should not be allowed if we catch the error after initTransactions failed
+ try {
+ producer.beginTransaction();
+ } finally {
+ producer.close(0, TimeUnit.MILLISECONDS);
+ }
+ }
}
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 911808a..8435e5a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -19,7 +19,7 @@ package kafka.api
import java.lang.{Long => JLong}
import java.util.Properties
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.TimeUnit
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
@@ -27,7 +27,7 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.{After, Before, Test}
@@ -532,6 +532,19 @@ class TransactionsTest extends KafkaServerTestHarness {
}
}
+ @Test(expected = classOf[KafkaException])
+ def testConsecutivelyRunInitTransactions(): Unit = {
+ val producer = createTransactionalProducer(transactionalId = "normalProducer")
+
+ try {
+ producer.initTransactions()
+ producer.initTransactions()
+ fail("Should have raised a KafkaException")
+ } finally {
+ producer.close()
+ }
+ }
+
private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
start: Int, end: Int, willBeCommitted: Boolean): Unit = {
for (i <- start until end) {
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.