You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 05:59:04 UTC
kafka git commit: KAFKA-5211;
Do not skip a corrupted record in consumer
Repository: kafka
Updated Branches:
refs/heads/trunk d41cf1b77 -> d08256390
KAFKA-5211; Do not skip a corrupted record in consumer
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3114 from becketqin/KAFKA-5211
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0825639
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0825639
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0825639
Branch: refs/heads/trunk
Commit: d082563907103ea79eed681305df7093053f52ec
Parents: d41cf1b
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue May 30 22:09:53 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 30 22:41:21 2017 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 84 +++++-------
.../clients/consumer/internals/FetcherTest.java | 127 +++++++++++++------
2 files changed, 123 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0825639/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5287b4e..e3f2355 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -113,7 +113,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final IsolationLevel isolationLevel;
private PartitionRecords nextInLineRecords = null;
- private ExceptionMetadata nextInLineExceptionMetadata = null;
public Fetcher(ConsumerNetworkClient client,
int minBytes,
@@ -154,7 +153,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private <T> ExtendedDeserializer<T> ensureExtended(Deserializer<T> deserializer) {
return deserializer instanceof ExtendedDeserializer ? (ExtendedDeserializer<T>) deserializer : new ExtendedDeserializer.Wrapper<>(deserializer);
}
-
+
/**
* Represents data about an offset returned by a broker.
*/
@@ -513,31 +512,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
* the defaultResetPolicy is NONE
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
- if (nextInLineExceptionMetadata != null) {
- ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
- nextInLineExceptionMetadata = null;
- TopicPartition tp = exceptionMetadata.partition;
- if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset)
- throw exceptionMetadata.exception;
- }
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
- // Needed to construct ExceptionMetadata if any exception is found when processing completedFetch
- TopicPartition fetchedPartition = null;
- long fetchedOffset = -1;
try {
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
- CompletedFetch completedFetch = completedFetches.poll();
+ CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
- fetchedPartition = completedFetch.partition;
- fetchedOffset = completedFetch.fetchedOffset;
nextInLineRecords = parseCompletedFetch(completedFetch);
+ completedFetches.poll();
} else {
- fetchedPartition = nextInLineRecords.partition;
- fetchedOffset = nextInLineRecords.nextFetchOffset;
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
@@ -560,8 +546,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
- // To be thrown in the next call of this method
- nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e);
}
return fetched;
}
@@ -952,10 +936,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private int recordsRead;
private int bytesRead;
private RecordBatch currentBatch;
+ private Record lastRecord;
private CloseableIterator<Record> records;
private long nextFetchOffset;
private boolean isFetched = false;
- private KafkaException nextInlineException;
+ private boolean hasExceptionInLastFetch;
private PartitionRecords(TopicPartition partition,
CompletedFetch completedFetch,
@@ -966,13 +951,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
this.nextFetchOffset = completedFetch.fetchedOffset;
this.abortedProducerIds = new HashSet<>();
this.abortedTransactions = abortedTransactions(completedFetch.partitionData);
- this.nextInlineException = null;
+ this.hasExceptionInLastFetch = false;
}
private void drain() {
if (!isFetched) {
maybeCloseRecordStream();
- nextInlineException = null;
+ hasExceptionInLastFetch = false;
this.isFetched = true;
this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
@@ -1013,6 +998,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
private Record nextFetchedRecord() {
+ if (hasExceptionInLastFetch) {
+ if (lastRecord == null) {
+ maybeEnsureValid(currentBatch);
+ } else {
+ maybeEnsureValid(lastRecord);
+ return lastRecord;
+ }
+ }
+
while (true) {
if (records == null || !records.hasNext()) {
maybeCloseRecordStream();
@@ -1021,6 +1015,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
drain();
return null;
}
+
+ lastRecord = null;
currentBatch = batches.next();
maybeEnsureValid(currentBatch);
@@ -1045,15 +1041,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
Record record = records.next();
- maybeEnsureValid(record);
-
+ lastRecord = record;
// skip any records out of range
if (record.offset() >= nextFetchOffset) {
- nextFetchOffset = record.offset() + 1;
+ // we only do validation when the message should not be skipped.
+ maybeEnsureValid(record);
// control records are not returned to the user
- if (!currentBatch.isControlBatch())
- return record;
+ if (!currentBatch.isControlBatch()) {
+ return record;
+ } else {
+ // Increment the next fetch offset when we skip a control batch.
+ nextFetchOffset = record.offset() + 1;
+ }
}
}
}
@@ -1061,11 +1061,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
if (isFetched)
return Collections.emptyList();
- if (nextInlineException != null) {
- KafkaException e = nextInlineException;
- nextInlineException = null;
- throw e;
- }
List<ConsumerRecord<K, V>> records = new ArrayList<>();
try {
@@ -1074,15 +1069,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
if (record == null)
break;
+ records.add(parseRecord(partition, currentBatch, record));
recordsRead++;
bytesRead += record.sizeInBytes();
- records.add(parseRecord(partition, currentBatch, record));
+ nextFetchOffset = record.offset() + 1;
}
} catch (KafkaException e) {
+ hasExceptionInLastFetch = true;
if (records.isEmpty())
throw e;
- // To be thrown in the next call of this method
- nextInlineException = e;
}
return records;
}
@@ -1132,18 +1127,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
}
- private static class ExceptionMetadata {
- private final TopicPartition partition;
- private final long fetchedOffset;
- private final KafkaException exception;
-
- private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) {
- this.partition = partition;
- this.fetchedOffset = fetchedOffset;
- this.exception = exception;
- }
- }
-
private static class CompletedFetch {
private final TopicPartition partition;
private final long fetchedOffset;
@@ -1232,7 +1215,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final Sensor recordsFetchLag;
private Set<TopicPartition> assignedPartitions;
-
+
private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
this.metrics = metrics;
this.metricsRegistry = metricsRegistry;
@@ -1305,8 +1288,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) {
recordsLag = this.metrics.sensor(name);
- recordsLag.add(this.metrics.metricName(name,
- metricsRegistry.partitionRecordsLag.group(),
+ recordsLag.add(this.metrics.metricName(name,
+ metricsRegistry.partitionRecordsLag.group(),
metricsRegistry.partitionRecordsLag.description()), new Value());
recordsLag.add(this.metrics.metricName(name + "-max",
metricsRegistry.partitionRecordsLagMax.group(),
@@ -1327,7 +1310,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
public void close() {
if (nextInLineRecords != null)
nextInLineRecords.drain();
- nextInLineExceptionMetadata = null;
decompressionBufferSupplier.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0825639/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 720079c..fedec2a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
@@ -261,8 +262,11 @@ public class FetcherTest {
int i = 0;
@Override
public byte[] deserialize(String topic, byte[] data) {
- if (i++ == 1)
+ if (i++ % 2 == 1) {
+ // Should be blocked on the value deserialization of the first record.
+ assertEquals(new String(data, StandardCharsets.UTF_8), "value-1");
throw new SerializationException();
+ }
return data;
}
};
@@ -276,12 +280,15 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(0);
- try {
- fetcher.fetchedRecords();
- fail("fetchedRecords should have raised");
- } catch (SerializationException e) {
- // the position should not advance since no data has been returned
- assertEquals(1, subscriptions.position(tp1).longValue());
+ // The fetcher should block on Deserialization error
+ for (int i = 0; i < 2; i++) {
+ try {
+ fetcher.fetchedRecords();
+ fail("fetchedRecords should have raised");
+ } catch (SerializationException e) {
+ // the position should not advance since no data has been returned
+ assertEquals(1, subscriptions.position(tp1).longValue());
+ }
}
}
@@ -329,20 +336,69 @@ public class FetcherTest {
assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
assertEquals(1, subscriptions.position(tp1).longValue());
- // the second fetchedRecords() should throw exception due to the second invalid message
- try {
- fetcher.fetchedRecords();
- fail("fetchedRecords should have raised KafkaException");
- } catch (KafkaException e) {
- assertEquals(1, subscriptions.position(tp1).longValue());
+ // the fetchedRecords() should always throw exception due to the second invalid message
+ for (int i = 0; i < 2; i++) {
+ try {
+ fetcher.fetchedRecords();
+ fail("fetchedRecords should have raised KafkaException");
+ } catch (KafkaException e) {
+ assertEquals(1, subscriptions.position(tp1).longValue());
+ }
}
- // the third fetchedRecords() should return the third valid message
- assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
+ // Seek to skip the bad record and fetch again.
+ subscriptions.seek(tp1, 2);
+ // Should not throw exception after the seek.
+ fetcher.fetchedRecords();
+ assertEquals(1, fetcher.sendFetches());
+ client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+ consumerClient.poll(0);
+
+ List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1);
+ assertEquals(1, records.size());
+ assertEquals(2L, records.get(0).offset());
assertEquals(3, subscriptions.position(tp1).longValue());
}
@Test
+ public void testInvalidDefaultRecordBatch() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out,
+ DefaultRecordBatch.CURRENT_MAGIC_VALUE,
+ CompressionType.NONE,
+ TimestampType.CREATE_TIME,
+ 0L, 10L, 0L, (short) 0, 0, false, false, 0, 1024);
+ builder.append(10L, "key".getBytes(), "value".getBytes());
+ builder.close();
+ buffer.flip();
+
+ // Garble the CRC
+ buffer.position(17);
+ buffer.put("beef".getBytes());
+ buffer.position(0);
+
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
+
+ // normal fetch
+ assertEquals(1, fetcher.sendFetches());
+ client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+ consumerClient.poll(0);
+
+ // the fetchedRecords() should always throw exception due to the bad batch.
+ for (int i = 0; i < 2; i++) {
+ try {
+ fetcher.fetchedRecords();
+ fail("fetchedRecords should have raised KafkaException");
+ } catch (KafkaException e) {
+ assertEquals(0, subscriptions.position(tp1).longValue());
+ }
+ }
+ }
+
+ @Test
public void testParseInvalidRecordBatch() throws Exception {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.NONE, TimestampType.CREATE_TIME,
@@ -373,7 +429,7 @@ public class FetcherTest {
@Test
public void testHeaders() {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time));
-
+
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
builder.append(0L, "key".getBytes(), "value-1".getBytes());
@@ -397,14 +453,14 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp1);
-
+
assertEquals(3, records.size());
Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = records.iterator();
-
+
ConsumerRecord<byte[], byte[]> record = recordIterator.next();
assertNull(record.headers().lastHeader("headerKey"));
-
+
record = recordIterator.next();
assertEquals("headerValue", new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
assertEquals("headerKey", record.headers().lastHeader("headerKey").key());
@@ -704,19 +760,20 @@ public class FetcherTest {
consumerClient.poll(0);
assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
- try {
- fetcherNoAutoReset.fetchedRecords();
- fail("Should have thrown OffsetOutOfRangeException");
- } catch (OffsetOutOfRangeException e) {
- assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
- assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+ for (int i = 0; i < 2; i++) {
+ try {
+ fetcherNoAutoReset.fetchedRecords();
+ fail("Should have thrown OffsetOutOfRangeException");
+ } catch (OffsetOutOfRangeException e) {
+ assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
+ assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+ }
}
- assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
}
@Test
public void testFetchPositionAfterException() {
- // verify the advancement in the next fetch offset equals the number of fetched records when
+ // verify the advancement in the next fetch offset equals to the number of fetched records when
// some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception
subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
subscriptionsNoAutoReset.seek(tp1, 1);
@@ -724,25 +781,21 @@ public class FetcherTest {
assertEquals(1, fetcherNoAutoReset.sendFetches());
- Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
- partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>();
partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+ partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
consumerClient.poll(0);
List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
List<OffsetOutOfRangeException> exceptions = new ArrayList<>();
- try {
- for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
- fetchedRecords.addAll(records);
- } catch (OffsetOutOfRangeException e) {
- exceptions.add(e);
- }
+ for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+ fetchedRecords.addAll(records);
- assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2).longValue() - 1);
+ assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2) - 1);
try {
for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())