You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/20 17:59:28 UTC

kafka git commit: KAFKA-4473: RecordCollector should handle retriable exceptions more strictly

Repository: kafka
Updated Branches:
  refs/heads/trunk a5c15ba03 -> 0321bf5aa


KAFKA-4473: RecordCollector should handle retriable exceptions more strictly

The `RecordCollectorImpl` currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to `send`, `flush`, `close`, first check for the existence of an exception and throw a `StreamsException` if it is non-null. Also, in the callback, if an exception has already occurred, the `offsets` map should not be updated.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2249 from dguy/kafka-4473


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0321bf5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0321bf5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0321bf5a

Branch: refs/heads/trunk
Commit: 0321bf5aa67b52d7fab82a326b15ca8e058faee6
Parents: a5c15ba
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 20 09:59:25 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 20 09:59:25 2016 -0800

----------------------------------------------------------------------
 .../internals/RecordCollectorImpl.java          | 16 ++++++-
 .../internals/RecordCollectorTest.java          | 49 ++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0321bf5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 0dbad5b..31596cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -44,6 +44,7 @@ public class RecordCollectorImpl implements RecordCollector {
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
     private final String logPrefix;
+    private volatile Exception sendException;
 
 
     public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
@@ -60,6 +61,7 @@ public class RecordCollectorImpl implements RecordCollector {
     @Override
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
                             StreamPartitioner<K, V> partitioner) {
+        checkForException();
         byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
         byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
         Integer partition = record.partition();
@@ -79,10 +81,14 @@ public class RecordCollectorImpl implements RecordCollector {
                     @Override
                     public void onCompletion(RecordMetadata metadata, Exception exception) {
                         if (exception == null) {
+                            if (sendException != null) {
+                                return;
+                            }
                             TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
                             offsets.put(tp, metadata.offset());
                         } else {
-                            log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
+                            sendException = exception;
+                            log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
                         }
                     }
                 });
@@ -98,10 +104,17 @@ public class RecordCollectorImpl implements RecordCollector {
         }
     }
 
+    private void checkForException() {
+        if (sendException != null) {
+            throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException);
+        }
+    }
+
     @Override
     public void flush() {
         log.debug("{} Flushing producer", logPrefix);
         this.producer.flush();
+        checkForException();
     }
 
     /**
@@ -110,6 +123,7 @@ public class RecordCollectorImpl implements RecordCollector {
     @Override
     public void close() {
         producer.close();
+        checkForException();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0321bf5a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 66397fb..03256eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -161,4 +161,53 @@ public class RecordCollectorTest {
         collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
 
     }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
+        final RecordCollector collector = new RecordCollectorImpl(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+                        callback.onCompletion(null, new Exception());
+                        return null;
+                    }
+                },
+                "test");
+        collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+        collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
+        final RecordCollector collector = new RecordCollectorImpl(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+                        callback.onCompletion(null, new Exception());
+                        return null;
+                    }
+                },
+                "test");
+        collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+        collector.flush();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
+        final RecordCollector collector = new RecordCollectorImpl(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+                        callback.onCompletion(null, new Exception());
+                        return null;
+                    }
+                },
+                "test");
+        collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
+        collector.close();
+    }
+
 }