You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/08 16:31:17 UTC
[kafka] branch 2.2 updated: KAFKA-8040: Streams handle
initTransactions timeout (#6372)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new ebbde63 KAFKA-8040: Streams handle initTransactions timeout (#6372)
ebbde63 is described below
commit ebbde6378c61776b468e2e9843b6b70b30d5688c
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Mar 8 09:29:22 2019 -0600
KAFKA-8040: Streams handle initTransactions timeout (#6372)
As of 2.0, Producer.initTransactions may throw a TimeoutException, which is retriable. Streams should retry instead of crashing when we encounter this exception
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bb...@gmail.com>
---
.../processor/internals/RecordCollectorImpl.java | 19 ++-
.../streams/processor/internals/StreamTask.java | 24 +++-
.../processor/internals/StreamTaskTest.java | 135 ++++++++++++++++++++-
3 files changed, 170 insertions(+), 8 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 757a5fb..2e9ead8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -218,11 +218,20 @@ public class RecordCollectorImpl implements RecordCollector {
}
});
} catch (final TimeoutException e) {
- log.error("Timeout exception caught when sending record to topic {}. " +
- "This might happen if the producer cannot send data to the Kafka cluster and thus, " +
- "its internal buffer fills up. " +
- "You can increase producer parameter `max.block.ms` to increase this timeout.", topic);
- throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic));
+ log.error(
+ "Timeout exception caught when sending record to topic {}. " +
+ "This might happen if the producer cannot send data to the Kafka cluster and thus, " +
+ "its internal buffer fills up. " +
+ "This can also happen if the broker is slow to respond, if the network connection to " +
+ "the broker was interrupted, or if similar circumstances arise. " +
+ "You can increase producer parameter `max.block.ms` to increase this timeout.",
+ topic,
+ e
+ );
+ throw new StreamsException(
+ String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic),
+ e
+ );
} catch (final Exception uncaughtException) {
if (uncaughtException instanceof KafkaException &&
uncaughtException.getCause() instanceof ProducerFencedException) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ca33756..65761e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
@@ -246,7 +247,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
// initialize transactions if eos is turned on, which will block if the previous transaction has not
// completed yet; do not start the first transaction until the topology has been initialized later
if (eosEnabled) {
- this.producer.initTransactions();
+ initializeTransactions();
}
}
@@ -298,7 +299,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
throw new IllegalStateException("Task producer should be null.");
}
producer = producerSupplier.get();
- producer.initTransactions();
+ initializeTransactions();
recordCollector.init(producer);
if (stateMgr.checkpoint != null) {
@@ -872,4 +873,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
Producer<byte[], byte[]> getProducer() {
return producer;
}
+
+ private void initializeTransactions() {
+ try {
+ producer.initTransactions();
+ } catch (final TimeoutException retriable) {
+ log.error(
+ "Timeout exception caught when initializing transactions for task {}. " +
+ "This might happen if the broker is slow to respond, if the network connection to " +
+ "the broker was interrupted, or if similar circumstances arise. " +
+ "You can increase producer parameter `max.block.ms` to increase this timeout.",
+ id,
+ retriable
+ );
+ throw new StreamsException(
+ format("%sFailed to initialize task %s due to timeout.", logPrefix, id),
+ retriable
+ );
+ }
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4efdc47..829106a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -47,11 +48,12 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
@@ -64,9 +66,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -74,6 +78,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -186,6 +191,134 @@ public class StreamTaskTest {
}
}
+ @Test
+ public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+ final ProcessorTopology topology = ProcessorTopology.withSources(
+ asList(source1, source2, processorStreamTime, processorSystemTime),
+ mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+ );
+
+ source1.addChild(processorStreamTime);
+ source2.addChild(processorStreamTime);
+ source1.addChild(processorSystemTime);
+ source2.addChild(processorSystemTime);
+
+ try {
+ new StreamTask(
+ taskId00,
+ partitions,
+ topology,
+ consumer,
+ changelogReader,
+ createConfig(true),
+ streamsMetrics,
+ stateDirectory,
+ null,
+ time,
+ () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) {
+ @Override
+ public void initTransactions() {
+ throw new TimeoutException("test");
+ }
+ },
+ null,
+ null
+ );
+ fail("Expected an exception");
+ } catch (final StreamsException expected) {
+ // make sure we log the explanation as an ERROR
+ assertTimeoutErrorLog(appender);
+
+ // make sure we report the correct message
+ assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout."));
+
+ // make sure we preserve the cause
+ assertEquals(expected.getCause().getClass(), TimeoutException.class);
+ assertThat(expected.getCause().getMessage(), is("test"));
+ }
+ LogCaptureAppender.unregister(appender);
+ }
+
+ @Test
+ public void shouldHandleInitTransactionsTimeoutExceptionOnResume() {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+ final ProcessorTopology topology = ProcessorTopology.withSources(
+ asList(source1, source2, processorStreamTime, processorSystemTime),
+ mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+ );
+
+ source1.addChild(processorStreamTime);
+ source2.addChild(processorStreamTime);
+ source1.addChild(processorSystemTime);
+ source2.addChild(processorSystemTime);
+
+ final AtomicBoolean timeOut = new AtomicBoolean(false);
+
+ final StreamTask testTask = new StreamTask(
+ taskId00,
+ partitions,
+ topology,
+ consumer,
+ changelogReader,
+ createConfig(true),
+ streamsMetrics,
+ stateDirectory,
+ null,
+ time,
+ () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) {
+ @Override
+ public void initTransactions() {
+ if (timeOut.get()) {
+ throw new TimeoutException("test");
+ } else {
+ super.initTransactions();
+ }
+ }
+ },
+ null,
+ null
+ );
+ testTask.initializeTopology();
+ testTask.suspend();
+ timeOut.set(true);
+ try {
+ testTask.resume();
+ fail("Expected an exception");
+ } catch (final StreamsException expected) {
+ // make sure we log the explanation as an ERROR
+ assertTimeoutErrorLog(appender);
+
+ // make sure we report the correct message
+ assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout."));
+
+ // make sure we preserve the cause
+ assertEquals(expected.getCause().getClass(), TimeoutException.class);
+ assertThat(expected.getCause().getMessage(), is("test"));
+ }
+ LogCaptureAppender.unregister(appender);
+ }
+
+ private void assertTimeoutErrorLog(final LogCaptureAppender appender) {
+
+ final String expectedErrorLogMessage =
+ "task [0_0] Timeout exception caught when initializing transactions for task 0_0. " +
+ "This might happen if the broker is slow to respond, if the network " +
+ "connection to the broker was interrupted, or if similar circumstances arise. " +
+ "You can increase producer parameter `max.block.ms` to increase this timeout.";
+
+ final List<String> expectedError =
+ appender
+ .getEvents()
+ .stream()
+ .filter(event -> event.getMessage().equals(expectedErrorLogMessage))
+ .map(LogCaptureAppender.Event::getLevel)
+ .collect(Collectors.toList());
+ assertThat(expectedError, is(singletonList("ERROR")));
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testProcessOrder() {