You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2019/12/21 18:31:19 UTC

[kafka] branch trunk updated: KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdbf40d  KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
cdbf40d is described below

commit cdbf40d572c388cb8616a6ffe6b8b2361cee4522
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Dec 21 12:30:36 2019 -0600

    KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
    
    Reviewers:  Matthias J. Sax <mj...@apache.org>
---
 .../processor/internals/RecordCollectorImpl.java   | 16 ++--
 .../processor/internals/RecordCollectorTest.java   | 90 +++++++++++++++++++---
 2 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index b444433..2451afc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.Collections;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -35,8 +34,8 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -45,6 +44,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -231,12 +231,10 @@ public class RecordCollectorImpl implements RecordCollector {
                 }
             });
         } catch (final RuntimeException uncaughtException) {
-            if (uncaughtException instanceof KafkaException &&
-                uncaughtException.getCause() instanceof ProducerFencedException) {
-                final KafkaException kafkaException = (KafkaException) uncaughtException;
+            if (isRecoverable(uncaughtException)) {
                 // producer.send() call may throw a KafkaException which wraps a FencedException,
                 // in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException
-                throw new RecoverableClientException("Caught a wrapped ProducerFencedException", kafkaException);
+                throw new RecoverableClientException("Caught a recoverable exception", uncaughtException);
             } else {
                 throw new StreamsException(
                     String.format(
@@ -252,6 +250,12 @@ public class RecordCollectorImpl implements RecordCollector {
         }
     }
 
+    public static boolean isRecoverable(final RuntimeException uncaughtException) {
+        return uncaughtException instanceof KafkaException && (
+            uncaughtException.getCause() instanceof ProducerFencedException ||
+                uncaughtException.getCause() instanceof UnknownProducerIdException);
+    }
+
     private void checkForException() {
         if (sendException != null) {
             throw sendException;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index e24ed00..3af2580 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
@@ -54,6 +55,7 @@ import java.util.concurrent.Future;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -180,8 +182,7 @@ public class RecordCollectorTest {
         assertThat(collector.offsets().get(topicPartition), equalTo(2L));
     }
 
-    @SuppressWarnings("unchecked")
-    @Test(expected = StreamsException.class)
+    @Test
     public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
         final RecordCollector collector = new RecordCollectorImpl(
             "test",
@@ -189,34 +190,105 @@ public class RecordCollectorTest {
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("dropped-records")
         );
-        collector.init(new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
             @Override
-            public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
                 throw new KafkaException();
             }
         });
 
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        final StreamsException thrown = assertThrows(StreamsException.class, () ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(KafkaException.class));
     }
 
-    @SuppressWarnings("unchecked")
-    @Test(expected = RecoverableClientException.class)
+    @Test
+    public void shouldThrowRecoverableExceptionOnProducerFencedException() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("dropped-records")
+        );
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+            @Override
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
+                throw new KafkaException(new ProducerFencedException("asdf"));
+            }
+        });
+
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+        assertThat(thrown.getCause().getCause(), instanceOf(ProducerFencedException.class));
+    }
+
+    @Test
+    public void shouldThrowRecoverableExceptionOnUnknownProducerException() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("dropped-records")
+        );
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+            @Override
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
+                throw new KafkaException(new UnknownProducerIdException("asdf"));
+            }
+        });
+
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+        assertThat(thrown.getCause().getCause(), instanceOf(UnknownProducerIdException.class));
+    }
+
+    @Test
     public void shouldThrowRecoverableExceptionWhenProducerFencedInCallback() {
         final RecordCollector collector = new RecordCollectorImpl(
             "test",
             logContext,
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
-        collector.init(new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
             @Override
-            public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
                 callback.onCompletion(null, new ProducerFencedException("asdf"));
                 return null;
             }
         });
 
         collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(ProducerFencedException.class));
+    }
+
+    @Test
+    public void shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+            @Override
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
+                callback.onCompletion(null, new UnknownProducerIdException("asdf"));
+                return null;
+            }
+        });
+
         collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(UnknownProducerIdException.class));
     }
 
     @SuppressWarnings("unchecked")