You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/06 18:50:59 UTC

kafka git commit: KAFKA-6120: RecordCollector should not retry sending

Repository: kafka
Updated Branches:
  refs/heads/trunk d637ad0da -> 2b5a21395


KAFKA-6120: RecordCollector should not retry sending

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #4148 from mjsax/kafka-6120-recordCollector


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b5a2139
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b5a2139
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b5a2139

Branch: refs/heads/trunk
Commit: 2b5a21395cf8ce6e3e29a9a778bc20f727ec35fd
Parents: d637ad0
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Nov 6 10:50:57 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 6 10:50:57 2017 -0800

----------------------------------------------------------------------
 .../internals/RecordCollectorImpl.java          | 101 ++++++++++++-------
 .../internals/RecordCollectorTest.java          |  55 ++++------
 2 files changed, 86 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2b5a2139/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 4eec2d5..9566051 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
@@ -37,15 +37,15 @@ import java.util.List;
 import java.util.Map;
 
 public class RecordCollectorImpl implements RecordCollector {
-    private static final int MAX_SEND_ATTEMPTS = 3;
-    private static final long SEND_RETRY_BACKOFF = 100L;
-
-
     private final Logger log;
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
     private final String logPrefix;
 
+    private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
+        "No more records will be sent and no more offsets will be recorded for this task.";
+    private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
+    private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
     private volatile KafkaException sendException;
 
     public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId, final LogContext logContext) {
@@ -93,43 +93,70 @@ public class RecordCollectorImpl implements RecordCollector {
         final ProducerRecord<byte[], byte[]> serializedRecord =
                 new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
 
-        // counting from 1 to make check further down more natural
-        // -> `if (attempt == MAX_SEND_ATTEMPTS)`
-        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; ++attempt) {
-            try {
-                producer.send(serializedRecord, new Callback() {
-                    @Override
-                    public void onCompletion(final RecordMetadata metadata, final Exception exception) {
-                        if (exception == null) {
-                            if (sendException != null) {
-                                return;
-                            }
-                            final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                            offsets.put(tp, metadata.offset());
-                        } else {
-                            if (sendException == null) {
-                                log.error("Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
-                                                "No more records will be sent and no more offsets will be recorded for this task.",
-                                        key, value, timestamp, topic, exception);
-                                if (exception instanceof ProducerFencedException) {
-                                    sendException = new ProducerFencedException(String.format("%sAbort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
-                                            logPrefix, key, value, timestamp, topic, exception.getMessage()));
-                                } else {
-                                    sendException = new StreamsException(String.format("%sAbort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
-                                            logPrefix, key, value, timestamp, topic, exception), exception);
+        try {
+            producer.send(serializedRecord, new Callback() {
+                @Override
+                public void onCompletion(final RecordMetadata metadata,
+                                         final Exception exception) {
+                    if (exception == null) {
+                        if (sendException != null) {
+                            return;
+                        }
+                        final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                        offsets.put(tp, metadata.offset());
+                    } else {
+                        if (sendException == null) {
+                            if (exception instanceof ProducerFencedException) {
+                                log.warn(LOG_MESSAGE, key, value, timestamp, topic, exception);
+                                sendException = new ProducerFencedException(
+                                    String.format(EXCEPTION_MESSAGE,
+                                                  logPrefix,
+                                                  "producer got fenced",
+                                                  key,
+                                                  value,
+                                                  timestamp,
+                                                  topic,
+                                                  exception.getMessage()));
+                            } else {
+                                String errorLogMessage = LOG_MESSAGE;
+                                String errorMessage = EXCEPTION_MESSAGE;
+                                if (exception instanceof RetriableException) {
+                                    errorLogMessage += PARAMETER_HINT;
+                                    errorMessage += PARAMETER_HINT;
                                 }
+                                log.error(errorLogMessage, key, value, timestamp, topic, exception);
+                                sendException = new StreamsException(
+                                    String.format(errorMessage,
+                                                  logPrefix,
+                                                  "an error caught",
+                                                  key,
+                                                  value,
+                                                  timestamp,
+                                                  topic,
+                                                  exception.getMessage()),
+                                    exception);
                             }
                         }
                     }
-                });
-                return;
-            } catch (final TimeoutException e) {
-                if (attempt == MAX_SEND_ATTEMPTS) {
-                    throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
                 }
-                log.warn("Timeout exception caught when sending record to topic {}; retrying with {} attempt", topic, attempt);
-                Utils.sleep(SEND_RETRY_BACKOFF);
-            }
+            });
+        } catch (final TimeoutException e) {
+            log.error("Timeout exception caught when sending record to topic {}. " +
+                "This might happen if the producer cannot send data to the Kafka cluster and thus, " +
+                "its internal buffer fills up. " +
+                "You can increase producer parameter `max.block.ms` to increase this timeout.", topic);
+            throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic));
+        } catch (final Exception fatalException) {
+            throw new StreamsException(
+                String.format(EXCEPTION_MESSAGE,
+                              logPrefix,
+                              "an error caught",
+                              key,
+                              value,
+                              timestamp,
+                              topic,
+                              fatalException.getMessage()),
+                fatalException);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b5a2139/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 7b2a41e..16400d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -22,10 +22,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -38,9 +38,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class RecordCollectorTest {
 
@@ -125,46 +125,23 @@ public class RecordCollectorTest {
     }
 
     @SuppressWarnings("unchecked")
-    @Test
-    public void shouldRetryWhenTimeoutExceptionOccursOnSend() {
-        final AtomicInteger attempt = new AtomicInteger(0);
-        final RecordCollectorImpl collector = new RecordCollectorImpl(
-                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
-                    @Override
-                    public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
-                        if (attempt.getAndIncrement() == 0) {
-                            throw new TimeoutException();
-                        }
-                        return super.send(record, callback);
-                    }
-                },
-                "test",
-                logContext);
-
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
-        assertEquals(Long.valueOf(0L), offset);
-    }
-
-    @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionAfterMaxAttempts() {
+    public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
                     public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
-                        throw new TimeoutException();
+                        throw new KafkaException();
                     }
                 },
                 "test",
                 logContext);
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-
     }
 
     @SuppressWarnings("unchecked")
-    @Test(expected = StreamsException.class)
+    @Test
     public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -177,11 +154,15 @@ public class RecordCollectorTest {
                 "test",
                 logContext);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+
+        try {
+            collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+            fail("Should have thrown StreamsException");
+        } catch (final StreamsException expected) { /* ok */ }
     }
 
     @SuppressWarnings("unchecked")
-    @Test(expected = StreamsException.class)
+    @Test
     public void shouldThrowStreamsExceptionOnFlushIfASendFailed() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -194,11 +175,15 @@ public class RecordCollectorTest {
                 "test",
                 logContext);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.flush();
+
+        try {
+            collector.flush();
+            fail("Should have thrown StreamsException");
+        } catch (final StreamsException expected) { /* ok */ }
     }
 
     @SuppressWarnings("unchecked")
-    @Test(expected = StreamsException.class)
+    @Test
     public void shouldThrowStreamsExceptionOnCloseIfASendFailed() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -211,7 +196,11 @@ public class RecordCollectorTest {
                 "test",
                 logContext);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.close();
+
+        try {
+            collector.close();
+            fail("Should have thrown StreamsException");
+        } catch (final StreamsException expected) { /* ok */ }
     }
 
     @SuppressWarnings("unchecked")