You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2019/12/21 18:31:19 UTC
[kafka] branch trunk updated: KAFKA-9310: Handle UnknownProducerId
from RecordCollector.send (#7845)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdbf40d KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
cdbf40d is described below
commit cdbf40d572c388cb8616a6ffe6b8b2361cee4522
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Dec 21 12:30:36 2019 -0600
KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
Reviewers: Matthias J. Sax <mj...@apache.org>
---
.../processor/internals/RecordCollectorImpl.java | 16 ++--
.../processor/internals/RecordCollectorTest.java | 90 +++++++++++++++++++---
2 files changed, 91 insertions(+), 15 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index b444433..2451afc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.Collections;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -35,8 +34,8 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -45,6 +44,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -231,12 +231,10 @@ public class RecordCollectorImpl implements RecordCollector {
}
});
} catch (final RuntimeException uncaughtException) {
- if (uncaughtException instanceof KafkaException &&
- uncaughtException.getCause() instanceof ProducerFencedException) {
- final KafkaException kafkaException = (KafkaException) uncaughtException;
+ if (isRecoverable(uncaughtException)) {
// producer.send() call may throw a KafkaException which wraps a FencedException,
// in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException
- throw new RecoverableClientException("Caught a wrapped ProducerFencedException", kafkaException);
+ throw new RecoverableClientException("Caught a recoverable exception", uncaughtException);
} else {
throw new StreamsException(
String.format(
@@ -252,6 +250,12 @@ public class RecordCollectorImpl implements RecordCollector {
}
}
+ public static boolean isRecoverable(final RuntimeException uncaughtException) {
+ return uncaughtException instanceof KafkaException && (
+ uncaughtException.getCause() instanceof ProducerFencedException ||
+ uncaughtException.getCause() instanceof UnknownProducerIdException);
+ }
+
private void checkForException() {
if (sendException != null) {
throw sendException;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index e24ed00..3af2580 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -54,6 +55,7 @@ import java.util.concurrent.Future;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -180,8 +182,7 @@ public class RecordCollectorTest {
assertThat(collector.offsets().get(topicPartition), equalTo(2L));
}
- @SuppressWarnings("unchecked")
- @Test(expected = StreamsException.class)
+ @Test
public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
final RecordCollector collector = new RecordCollectorImpl(
"test",
@@ -189,34 +190,105 @@ public class RecordCollectorTest {
new DefaultProductionExceptionHandler(),
new Metrics().sensor("dropped-records")
);
- collector.init(new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
- public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
throw new KafkaException();
}
});
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ final StreamsException thrown = assertThrows(StreamsException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(KafkaException.class));
}
- @SuppressWarnings("unchecked")
- @Test(expected = RecoverableClientException.class)
+ @Test
+ public void shouldThrowRecoverableExceptionOnProducerFencedException() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ "test",
+ logContext,
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("dropped-records")
+ );
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
+ throw new KafkaException(new ProducerFencedException("asdf"));
+ }
+ });
+
+ final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+ assertThat(thrown.getCause().getCause(), instanceOf(ProducerFencedException.class));
+ }
+
+ @Test
+ public void shouldThrowRecoverableExceptionOnUnknownProducerException() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ "test",
+ logContext,
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("dropped-records")
+ );
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
+ throw new KafkaException(new UnknownProducerIdException("asdf"));
+ }
+ });
+
+ final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+ assertThat(thrown.getCause().getCause(), instanceOf(UnknownProducerIdException.class));
+ }
+
+ @Test
public void shouldThrowRecoverableExceptionWhenProducerFencedInCallback() {
final RecordCollector collector = new RecordCollectorImpl(
"test",
logContext,
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.init(new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
- public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, new ProducerFencedException("asdf"));
return null;
}
});
collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(ProducerFencedException.class));
+ }
+
+ @Test
+ public void shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ "test",
+ logContext,
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
+ callback.onCompletion(null, new UnknownProducerIdException("asdf"));
+ return null;
+ }
+ });
+
collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(UnknownProducerIdException.class));
}
@SuppressWarnings("unchecked")