You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/08 15:29:33 UTC

[kafka] branch trunk updated: KAFKA-8040: Streams handle initTransactions timeout (#6372)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f716d08  KAFKA-8040: Streams handle initTransactions timeout (#6372)
f716d08 is described below

commit f716d08c9c7f0f3ec7456cb9a27a42104457b867
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Mar 8 09:29:22 2019 -0600

    KAFKA-8040: Streams handle initTransactions timeout (#6372)
    
    As of 2.0, Producer.initTransactions may throw a TimeoutException, which is retriable. Streams should retry instead of crashing when we encounter this exception
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>,  Bill Bejeck <bb...@gmail.com>
---
 .../processor/internals/RecordCollectorImpl.java   |  19 ++-
 .../streams/processor/internals/StreamTask.java    |  24 +++-
 .../processor/internals/StreamTaskTest.java        | 135 ++++++++++++++++++++-
 3 files changed, 170 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 757a5fb..2e9ead8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -218,11 +218,20 @@ public class RecordCollectorImpl implements RecordCollector {
                 }
             });
         } catch (final TimeoutException e) {
-            log.error("Timeout exception caught when sending record to topic {}. " +
-                "This might happen if the producer cannot send data to the Kafka cluster and thus, " +
-                "its internal buffer fills up. " +
-                "You can increase producer parameter `max.block.ms` to increase this timeout.", topic);
-            throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic));
+            log.error(
+                "Timeout exception caught when sending record to topic {}. " +
+                    "This might happen if the producer cannot send data to the Kafka cluster and thus, " +
+                    "its internal buffer fills up. " +
+                    "This can also happen if the broker is slow to respond, if the network connection to " +
+                    "the broker was interrupted, or if similar circumstances arise. " +
+                    "You can increase producer parameter `max.block.ms` to increase this timeout.",
+                topic,
+                e
+            );
+            throw new StreamsException(
+                String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic),
+                e
+            );
         } catch (final Exception uncaughtException) {
             if (uncaughtException instanceof KafkaException &&
                 uncaughtException.getCause() instanceof ProducerFencedException) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ca33756..65761e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
@@ -246,7 +247,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         // initialize transactions if eos is turned on, which will block if the previous transaction has not
         // completed yet; do not start the first transaction until the topology has been initialized later
         if (eosEnabled) {
-            this.producer.initTransactions();
+            initializeTransactions();
         }
     }
 
@@ -298,7 +299,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 throw new IllegalStateException("Task producer should be null.");
             }
             producer = producerSupplier.get();
-            producer.initTransactions();
+            initializeTransactions();
             recordCollector.init(producer);
 
             if (stateMgr.checkpoint != null) {
@@ -872,4 +873,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     Producer<byte[], byte[]> getProducer() {
         return producer;
     }
+
+    private void initializeTransactions() {
+        try {
+            producer.initTransactions();
+        } catch (final TimeoutException retriable) {
+            log.error(
+                "Timeout exception caught when initializing transactions for task {}. " +
+                    "This might happen if the broker is slow to respond, if the network connection to " +
+                    "the broker was interrupted, or if similar circumstances arise. " +
+                    "You can increase producer parameter `max.block.ms` to increase this timeout.",
+                id,
+                retriable
+            );
+            throw new StreamsException(
+                format("%sFailed to initialize task %s due to timeout.", logPrefix, id),
+                retriable
+            );
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4efdc47..829106a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -47,11 +48,12 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -64,9 +66,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -74,6 +78,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -186,6 +191,134 @@ public class StreamTaskTest {
         }
     }
 
+    @Test
+    public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final ProcessorTopology topology = ProcessorTopology.withSources(
+            asList(source1, source2, processorStreamTime, processorSystemTime),
+            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+        );
+
+        source1.addChild(processorStreamTime);
+        source2.addChild(processorStreamTime);
+        source1.addChild(processorSystemTime);
+        source2.addChild(processorSystemTime);
+
+        try {
+            new StreamTask(
+                taskId00,
+                partitions,
+                topology,
+                consumer,
+                changelogReader,
+                createConfig(true),
+                streamsMetrics,
+                stateDirectory,
+                null,
+                time,
+                () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) {
+                    @Override
+                    public void initTransactions() {
+                        throw new TimeoutException("test");
+                    }
+                },
+                null,
+                null
+            );
+            fail("Expected an exception");
+        } catch (final StreamsException expected) {
+            // make sure we log the explanation as an ERROR
+            assertTimeoutErrorLog(appender);
+
+            // make sure we report the correct message
+            assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout."));
+
+            // make sure we preserve the cause
+            assertEquals(expected.getCause().getClass(), TimeoutException.class);
+            assertThat(expected.getCause().getMessage(), is("test"));
+        }
+        LogCaptureAppender.unregister(appender);
+    }
+
+    @Test
+    public void shouldHandleInitTransactionsTimeoutExceptionOnResume() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final ProcessorTopology topology = ProcessorTopology.withSources(
+            asList(source1, source2, processorStreamTime, processorSystemTime),
+            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+        );
+
+        source1.addChild(processorStreamTime);
+        source2.addChild(processorStreamTime);
+        source1.addChild(processorSystemTime);
+        source2.addChild(processorSystemTime);
+
+        final AtomicBoolean timeOut = new AtomicBoolean(false);
+
+        final StreamTask testTask = new StreamTask(
+            taskId00,
+            partitions,
+            topology,
+            consumer,
+            changelogReader,
+            createConfig(true),
+            streamsMetrics,
+            stateDirectory,
+            null,
+            time,
+            () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) {
+                @Override
+                public void initTransactions() {
+                    if (timeOut.get()) {
+                        throw new TimeoutException("test");
+                    } else {
+                        super.initTransactions();
+                    }
+                }
+            },
+            null,
+            null
+        );
+        testTask.initializeTopology();
+        testTask.suspend();
+        timeOut.set(true);
+        try {
+            testTask.resume();
+            fail("Expected an exception");
+        } catch (final StreamsException expected) {
+            // make sure we log the explanation as an ERROR
+            assertTimeoutErrorLog(appender);
+
+            // make sure we report the correct message
+            assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout."));
+
+            // make sure we preserve the cause
+            assertEquals(expected.getCause().getClass(), TimeoutException.class);
+            assertThat(expected.getCause().getMessage(), is("test"));
+        }
+        LogCaptureAppender.unregister(appender);
+    }
+
+    private void assertTimeoutErrorLog(final LogCaptureAppender appender) {
+
+        final String expectedErrorLogMessage =
+            "task [0_0] Timeout exception caught when initializing transactions for task 0_0. " +
+                "This might happen if the broker is slow to respond, if the network " +
+                "connection to the broker was interrupted, or if similar circumstances arise. " +
+                "You can increase producer parameter `max.block.ms` to increase this timeout.";
+
+        final List<String> expectedError =
+            appender
+                .getEvents()
+                .stream()
+                .filter(event -> event.getMessage().equals(expectedErrorLogMessage))
+                .map(LogCaptureAppender.Event::getLevel)
+                .collect(Collectors.toList());
+        assertThat(expectedError, is(singletonList("ERROR")));
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testProcessOrder() {