You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/20 17:59:28 UTC
kafka git commit: KAFKA-4473: RecordCollector should handle retriable
exceptions more strictly
Repository: kafka
Updated Branches:
refs/heads/trunk a5c15ba03 -> 0321bf5aa
KAFKA-4473: RecordCollector should handle retriable exceptions more strictly
The `RecordCollectorImpl` currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to `send`, `flush`, `close`, first check for the existence of an exception and throw a `StreamsException` if it is non-null. Also, in the callback, if an exception has already occurred, the `offsets` map should not be updated.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2249 from dguy/kafka-4473
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0321bf5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0321bf5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0321bf5a
Branch: refs/heads/trunk
Commit: 0321bf5aa67b52d7fab82a326b15ca8e058faee6
Parents: a5c15ba
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 20 09:59:25 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 20 09:59:25 2016 -0800
----------------------------------------------------------------------
.../internals/RecordCollectorImpl.java | 16 ++++++-
.../internals/RecordCollectorTest.java | 49 ++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0321bf5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 0dbad5b..31596cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -44,6 +44,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
+ private volatile Exception sendException;
public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
@@ -60,6 +61,7 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
StreamPartitioner<K, V> partitioner) {
+ checkForException();
byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
Integer partition = record.partition();
@@ -79,10 +81,14 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
+ if (sendException != null) {
+ return;
+ }
TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
- log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
+ sendException = exception;
+ log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
}
}
});
@@ -98,10 +104,17 @@ public class RecordCollectorImpl implements RecordCollector {
}
}
+ private void checkForException() {
+ if (sendException != null) {
+ throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException);
+ }
+ }
+
@Override
public void flush() {
log.debug("{} Flushing producer", logPrefix);
this.producer.flush();
+ checkForException();
}
/**
@@ -110,6 +123,7 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public void close() {
producer.close();
+ checkForException();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/0321bf5a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 66397fb..03256eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -161,4 +161,53 @@ public class RecordCollectorTest {
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
}
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = StreamsException.class)
+ public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
+ final RecordCollector collector = new RecordCollectorImpl(
+ new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ callback.onCompletion(null, new Exception());
+ return null;
+ }
+ },
+ "test");
+ collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+ collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = StreamsException.class)
+ public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
+ final RecordCollector collector = new RecordCollectorImpl(
+ new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ callback.onCompletion(null, new Exception());
+ return null;
+ }
+ },
+ "test");
+ collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+ collector.flush();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = StreamsException.class)
+ public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
+ final RecordCollector collector = new RecordCollectorImpl(
+ new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ callback.onCompletion(null, new Exception());
+ return null;
+ }
+ },
+ "test");
+ collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+ collector.close();
+ }
+
}