You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/06 18:50:59 UTC
kafka git commit: KAFKA-6120: RecordCollector should not retry sending
Repository: kafka
Updated Branches:
refs/heads/trunk d637ad0da -> 2b5a21395
KAFKA-6120: RecordCollector should not retry sending
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #4148 from mjsax/kafka-6120-recordCollector
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b5a2139
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b5a2139
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b5a2139
Branch: refs/heads/trunk
Commit: 2b5a21395cf8ce6e3e29a9a778bc20f727ec35fd
Parents: d637ad0
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Nov 6 10:50:57 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 6 10:50:57 2017 -0800
----------------------------------------------------------------------
.../internals/RecordCollectorImpl.java | 101 ++++++++++++-------
.../internals/RecordCollectorTest.java | 55 ++++------
2 files changed, 86 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b5a2139/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 4eec2d5..9566051 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
@@ -37,15 +37,15 @@ import java.util.List;
import java.util.Map;
public class RecordCollectorImpl implements RecordCollector {
- private static final int MAX_SEND_ATTEMPTS = 3;
- private static final long SEND_RETRY_BACKOFF = 100L;
-
-
private final Logger log;
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
+ private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
+ "No more records will be sent and no more offsets will be recorded for this task.";
+ private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
+ private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
private volatile KafkaException sendException;
public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId, final LogContext logContext) {
@@ -93,43 +93,70 @@ public class RecordCollectorImpl implements RecordCollector {
final ProducerRecord<byte[], byte[]> serializedRecord =
new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
- // counting from 1 to make check further down more natural
- // -> `if (attempt == MAX_SEND_ATTEMPTS)`
- for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; ++attempt) {
- try {
- producer.send(serializedRecord, new Callback() {
- @Override
- public void onCompletion(final RecordMetadata metadata, final Exception exception) {
- if (exception == null) {
- if (sendException != null) {
- return;
- }
- final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
- offsets.put(tp, metadata.offset());
- } else {
- if (sendException == null) {
- log.error("Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
- "No more records will be sent and no more offsets will be recorded for this task.",
- key, value, timestamp, topic, exception);
- if (exception instanceof ProducerFencedException) {
- sendException = new ProducerFencedException(String.format("%sAbort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
- logPrefix, key, value, timestamp, topic, exception.getMessage()));
- } else {
- sendException = new StreamsException(String.format("%sAbort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
- logPrefix, key, value, timestamp, topic, exception), exception);
+ try {
+ producer.send(serializedRecord, new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata,
+ final Exception exception) {
+ if (exception == null) {
+ if (sendException != null) {
+ return;
+ }
+ final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+ offsets.put(tp, metadata.offset());
+ } else {
+ if (sendException == null) {
+ if (exception instanceof ProducerFencedException) {
+ log.warn(LOG_MESSAGE, key, value, timestamp, topic, exception);
+ sendException = new ProducerFencedException(
+ String.format(EXCEPTION_MESSAGE,
+ logPrefix,
+ "producer got fenced",
+ key,
+ value,
+ timestamp,
+ topic,
+ exception.getMessage()));
+ } else {
+ String errorLogMessage = LOG_MESSAGE;
+ String errorMessage = EXCEPTION_MESSAGE;
+ if (exception instanceof RetriableException) {
+ errorLogMessage += PARAMETER_HINT;
+ errorMessage += PARAMETER_HINT;
}
+ log.error(errorLogMessage, key, value, timestamp, topic, exception);
+ sendException = new StreamsException(
+ String.format(errorMessage,
+ logPrefix,
+ "an error caught",
+ key,
+ value,
+ timestamp,
+ topic,
+ exception.getMessage()),
+ exception);
}
}
}
- });
- return;
- } catch (final TimeoutException e) {
- if (attempt == MAX_SEND_ATTEMPTS) {
- throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
}
- log.warn("Timeout exception caught when sending record to topic {}; retrying with {} attempt", topic, attempt);
- Utils.sleep(SEND_RETRY_BACKOFF);
- }
+ });
+ } catch (final TimeoutException e) {
+ log.error("Timeout exception caught when sending record to topic {}. " +
+ "This might happen if the producer cannot send data to the Kafka cluster and thus, " +
+ "its internal buffer fills up. " +
+ "You can increase producer parameter `max.block.ms` to increase this timeout.", topic);
+ throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic));
+ } catch (final Exception fatalException) {
+ throw new StreamsException(
+ String.format(EXCEPTION_MESSAGE,
+ logPrefix,
+ "an error caught",
+ key,
+ value,
+ timestamp,
+ topic,
+ fatalException.getMessage()),
+ fatalException);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b5a2139/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 7b2a41e..16400d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -22,10 +22,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
@@ -38,9 +38,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
public class RecordCollectorTest {
@@ -125,46 +125,23 @@ public class RecordCollectorTest {
}
@SuppressWarnings("unchecked")
- @Test
- public void shouldRetryWhenTimeoutExceptionOccursOnSend() {
- final AtomicInteger attempt = new AtomicInteger(0);
- final RecordCollectorImpl collector = new RecordCollectorImpl(
- new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
- @Override
- public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
- if (attempt.getAndIncrement() == 0) {
- throw new TimeoutException();
- }
- return super.send(record, callback);
- }
- },
- "test",
- logContext);
-
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
- assertEquals(Long.valueOf(0L), offset);
- }
-
- @SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionAfterMaxAttempts() {
+ public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
- throw new TimeoutException();
+ throw new KafkaException();
}
},
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-
}
@SuppressWarnings("unchecked")
- @Test(expected = StreamsException.class)
+ @Test
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -177,11 +154,15 @@ public class RecordCollectorTest {
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+
+ try {
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ fail("Should have thrown StreamsException");
+ } catch (final StreamsException expected) { /* ok */ }
}
@SuppressWarnings("unchecked")
- @Test(expected = StreamsException.class)
+ @Test
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -194,11 +175,15 @@ public class RecordCollectorTest {
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.flush();
+
+ try {
+ collector.flush();
+ fail("Should have thrown StreamsException");
+ } catch (final StreamsException expected) { /* ok */ }
}
@SuppressWarnings("unchecked")
- @Test(expected = StreamsException.class)
+ @Test
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -211,7 +196,11 @@ public class RecordCollectorTest {
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.close();
+
+ try {
+ collector.close();
+ fail("Should have thrown StreamsException");
+ } catch (final StreamsException expected) { /* ok */ }
}
@SuppressWarnings("unchecked")