You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 05:59:04 UTC

kafka git commit: KAFKA-5211; Do not skip a corrupted record in consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk d41cf1b77 -> d08256390


KAFKA-5211; Do not skip a corrupted record in consumer

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3114 from becketqin/KAFKA-5211


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0825639
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0825639
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0825639

Branch: refs/heads/trunk
Commit: d082563907103ea79eed681305df7093053f52ec
Parents: d41cf1b
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue May 30 22:09:53 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 30 22:41:21 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  84 +++++-------
 .../clients/consumer/internals/FetcherTest.java | 127 +++++++++++++------
 2 files changed, 123 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0825639/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5287b4e..e3f2355 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -113,7 +113,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final IsolationLevel isolationLevel;
 
     private PartitionRecords nextInLineRecords = null;
-    private ExceptionMetadata nextInLineExceptionMetadata = null;
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
@@ -154,7 +153,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private <T> ExtendedDeserializer<T> ensureExtended(Deserializer<T> deserializer) {
         return deserializer instanceof ExtendedDeserializer ? (ExtendedDeserializer<T>) deserializer : new ExtendedDeserializer.Wrapper<>(deserializer);
     }
-    
+
     /**
      * Represents data about an offset returned by a broker.
      */
@@ -513,31 +512,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      *         the defaultResetPolicy is NONE
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
-        if (nextInLineExceptionMetadata != null) {
-            ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
-            nextInLineExceptionMetadata = null;
-            TopicPartition tp = exceptionMetadata.partition;
-            if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset)
-                throw exceptionMetadata.exception;
-        }
         Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
         int recordsRemaining = maxPollRecords;
-        // Needed to construct ExceptionMetadata if any exception is found when processing completedFetch
-        TopicPartition fetchedPartition = null;
-        long fetchedOffset = -1;
 
         try {
             while (recordsRemaining > 0) {
                 if (nextInLineRecords == null || nextInLineRecords.isFetched) {
-                    CompletedFetch completedFetch = completedFetches.poll();
+                    CompletedFetch completedFetch = completedFetches.peek();
                     if (completedFetch == null) break;
 
-                    fetchedPartition = completedFetch.partition;
-                    fetchedOffset = completedFetch.fetchedOffset;
                     nextInLineRecords = parseCompletedFetch(completedFetch);
+                    completedFetches.poll();
                 } else {
-                    fetchedPartition = nextInLineRecords.partition;
-                    fetchedOffset = nextInLineRecords.nextFetchOffset;
                     List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
                     TopicPartition partition = nextInLineRecords.partition;
                     if (!records.isEmpty()) {
@@ -560,8 +546,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         } catch (KafkaException e) {
             if (fetched.isEmpty())
                 throw e;
-            // To be thrown in the next call of this method
-            nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e);
         }
         return fetched;
     }
@@ -952,10 +936,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         private int recordsRead;
         private int bytesRead;
         private RecordBatch currentBatch;
+        private Record lastRecord;
         private CloseableIterator<Record> records;
         private long nextFetchOffset;
         private boolean isFetched = false;
-        private KafkaException nextInlineException;
+        private boolean hasExceptionInLastFetch;
 
         private PartitionRecords(TopicPartition partition,
                                  CompletedFetch completedFetch,
@@ -966,13 +951,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             this.nextFetchOffset = completedFetch.fetchedOffset;
             this.abortedProducerIds = new HashSet<>();
             this.abortedTransactions = abortedTransactions(completedFetch.partitionData);
-            this.nextInlineException = null;
+            this.hasExceptionInLastFetch = false;
         }
 
         private void drain() {
             if (!isFetched) {
                 maybeCloseRecordStream();
-                nextInlineException = null;
+                hasExceptionInLastFetch = false;
                 this.isFetched = true;
                 this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
 
@@ -1013,6 +998,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         }
 
         private Record nextFetchedRecord() {
+            if (hasExceptionInLastFetch) {
+                if (lastRecord == null) {
+                    maybeEnsureValid(currentBatch);
+                } else {
+                    maybeEnsureValid(lastRecord);
+                    return lastRecord;
+                }
+            }
+
             while (true) {
                 if (records == null || !records.hasNext()) {
                     maybeCloseRecordStream();
@@ -1021,6 +1015,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         drain();
                         return null;
                     }
+
+                    lastRecord = null;
                     currentBatch = batches.next();
                     maybeEnsureValid(currentBatch);
 
@@ -1045,15 +1041,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 }
 
                 Record record = records.next();
-                maybeEnsureValid(record);
-
+                lastRecord = record;
                 // skip any records out of range
                 if (record.offset() >= nextFetchOffset) {
-                    nextFetchOffset = record.offset() + 1;
+                    // we only do validation when the message should not be skipped.
+                    maybeEnsureValid(record);
 
                     // control records are not returned to the user
-                    if (!currentBatch.isControlBatch())
-                         return record;
+                    if (!currentBatch.isControlBatch()) {
+                        return record;
+                    } else {
+                        // Increment the next fetch offset when we skip a control batch.
+                        nextFetchOffset = record.offset() + 1;
+                    }
                 }
             }
         }
@@ -1061,11 +1061,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
             if (isFetched)
                 return Collections.emptyList();
-            if (nextInlineException != null) {
-                KafkaException e = nextInlineException;
-                nextInlineException = null;
-                throw e;
-            }
 
             List<ConsumerRecord<K, V>> records = new ArrayList<>();
             try {
@@ -1074,15 +1069,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                     if (record == null)
                         break;
 
+                    records.add(parseRecord(partition, currentBatch, record));
                     recordsRead++;
                     bytesRead += record.sizeInBytes();
-                    records.add(parseRecord(partition, currentBatch, record));
+                    nextFetchOffset = record.offset() + 1;
                 }
             } catch (KafkaException e) {
+                hasExceptionInLastFetch = true;
                 if (records.isEmpty())
                     throw e;
-                // To be thrown in the next call of this method
-                nextInlineException = e;
             }
             return records;
         }
@@ -1132,18 +1127,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         }
     }
 
-    private static class ExceptionMetadata {
-        private final TopicPartition partition;
-        private final long fetchedOffset;
-        private final KafkaException exception;
-
-        private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) {
-            this.partition = partition;
-            this.fetchedOffset = fetchedOffset;
-            this.exception = exception;
-        }
-    }
-
     private static class CompletedFetch {
         private final TopicPartition partition;
         private final long fetchedOffset;
@@ -1232,7 +1215,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         private final Sensor recordsFetchLag;
 
         private Set<TopicPartition> assignedPartitions;
-        
+
         private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
             this.metrics = metrics;
             this.metricsRegistry = metricsRegistry;
@@ -1305,8 +1288,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             Sensor recordsLag = this.metrics.getSensor(name);
             if (recordsLag == null) {
                 recordsLag = this.metrics.sensor(name);
-                recordsLag.add(this.metrics.metricName(name, 
-                        metricsRegistry.partitionRecordsLag.group(), 
+                recordsLag.add(this.metrics.metricName(name,
+                        metricsRegistry.partitionRecordsLag.group(),
                         metricsRegistry.partitionRecordsLag.description()), new Value());
                 recordsLag.add(this.metrics.metricName(name + "-max",
                         metricsRegistry.partitionRecordsLagMax.group(),
@@ -1327,7 +1310,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     public void close() {
         if (nextInLineRecords != null)
             nextInLineRecords.drain();
-        nextInLineExceptionMetadata = null;
         decompressionBufferSupplier.close();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0825639/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 720079c..fedec2a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -261,8 +262,11 @@ public class FetcherTest {
             int i = 0;
             @Override
             public byte[] deserialize(String topic, byte[] data) {
-                if (i++ == 1)
+                if (i++ % 2 == 1) {
+                    // Should be blocked on the value deserialization of the first record.
+                    assertEquals(new String(data, StandardCharsets.UTF_8), "value-1");
                     throw new SerializationException();
+                }
                 return data;
             }
         };
@@ -276,12 +280,15 @@ public class FetcherTest {
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
-        try {
-            fetcher.fetchedRecords();
-            fail("fetchedRecords should have raised");
-        } catch (SerializationException e) {
-            // the position should not advance since no data has been returned
-            assertEquals(1, subscriptions.position(tp1).longValue());
+        // The fetcher should block on Deserialization error
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcher.fetchedRecords();
+                fail("fetchedRecords should have raised");
+            } catch (SerializationException e) {
+                // the position should not advance since no data has been returned
+                assertEquals(1, subscriptions.position(tp1).longValue());
+            }
         }
     }
 
@@ -329,20 +336,69 @@ public class FetcherTest {
         assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
         assertEquals(1, subscriptions.position(tp1).longValue());
 
-        // the second fetchedRecords() should throw exception due to the second invalid message
-        try {
-            fetcher.fetchedRecords();
-            fail("fetchedRecords should have raised KafkaException");
-        } catch (KafkaException e) {
-            assertEquals(1, subscriptions.position(tp1).longValue());
+        // the fetchedRecords() should always throw exception due to the second invalid message
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcher.fetchedRecords();
+                fail("fetchedRecords should have raised KafkaException");
+            } catch (KafkaException e) {
+                assertEquals(1, subscriptions.position(tp1).longValue());
+            }
         }
 
-        // the third fetchedRecords() should return the third valid message
-        assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
+        // Seek to skip the bad record and fetch again.
+        subscriptions.seek(tp1, 2);
+        // Should not throw exception after the seek.
+        fetcher.fetchedRecords();
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        consumerClient.poll(0);
+
+        List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1);
+        assertEquals(1, records.size());
+        assertEquals(2L, records.get(0).offset());
         assertEquals(3, subscriptions.position(tp1).longValue());
     }
 
     @Test
+    public void testInvalidDefaultRecordBatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out,
+                                                                DefaultRecordBatch.CURRENT_MAGIC_VALUE,
+                                                                CompressionType.NONE,
+                                                                TimestampType.CREATE_TIME,
+                                                                0L, 10L, 0L, (short) 0, 0, false, false, 0, 1024);
+        builder.append(10L, "key".getBytes(), "value".getBytes());
+        builder.close();
+        buffer.flip();
+
+        // Garble the CRC
+        buffer.position(17);
+        buffer.put("beef".getBytes());
+        buffer.position(0);
+
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        consumerClient.poll(0);
+
+        // the fetchedRecords() should always throw exception due to the bad batch.
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcher.fetchedRecords();
+                fail("fetchedRecords should have raised KafkaException");
+            } catch (KafkaException e) {
+                assertEquals(0, subscriptions.position(tp1).longValue());
+            }
+        }
+    }
+
+    @Test
     public void testParseInvalidRecordBatch() throws Exception {
         MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
                 CompressionType.NONE, TimestampType.CREATE_TIME,
@@ -373,7 +429,7 @@ public class FetcherTest {
     @Test
     public void testHeaders() {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time));
-        
+
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
         builder.append(0L, "key".getBytes(), "value-1".getBytes());
 
@@ -397,14 +453,14 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp1);
-        
+
         assertEquals(3, records.size());
 
         Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = records.iterator();
-        
+
         ConsumerRecord<byte[], byte[]> record = recordIterator.next();
         assertNull(record.headers().lastHeader("headerKey"));
-        
+
         record = recordIterator.next();
         assertEquals("headerValue", new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
         assertEquals("headerKey", record.headers().lastHeader("headerKey").key());
@@ -704,19 +760,20 @@ public class FetcherTest {
         consumerClient.poll(0);
 
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
-        try {
-            fetcherNoAutoReset.fetchedRecords();
-            fail("Should have thrown OffsetOutOfRangeException");
-        } catch (OffsetOutOfRangeException e) {
-            assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
-            assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcherNoAutoReset.fetchedRecords();
+                fail("Should have thrown OffsetOutOfRangeException");
+            } catch (OffsetOutOfRangeException e) {
+                assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
+                assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+            }
         }
-        assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
     }
 
     @Test
     public void testFetchPositionAfterException() {
-        // verify the advancement in the next fetch offset equals the number of fetched records when
+        // verify the advancement in the next fetch offset equals to the number of fetched records when
         // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception
         subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
         subscriptionsNoAutoReset.seek(tp1, 1);
@@ -724,25 +781,21 @@ public class FetcherTest {
 
         assertEquals(1, fetcherNoAutoReset.sendFetches());
 
-        Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
-        partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>();
         partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
             FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
         consumerClient.poll(0);
 
         List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
         List<OffsetOutOfRangeException> exceptions = new ArrayList<>();
 
-        try {
-            for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
-                fetchedRecords.addAll(records);
-        } catch (OffsetOutOfRangeException e) {
-            exceptions.add(e);
-        }
+        for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+            fetchedRecords.addAll(records);
 
-        assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2).longValue() - 1);
+        assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2) - 1);
 
         try {
             for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())