You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/07/28 10:18:42 UTC

kafka git commit: KAFKA-3977; Defer fetch parsing for space efficiency and to ensure exceptions are raised to the user [Forced Update!]

Repository: kafka
Updated Branches:
  refs/heads/trunk a750c5672 -> ff557f02a (forced update)


KAFKA-3977; Defer fetch parsing for space efficiency and to ensure exceptions are raised to the user

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava <me...@ewencp.org>, Ismael Juma <is...@juma.me.uk>

Closes #1656 from hachikuji/KAFKA-3977


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff557f02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff557f02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff557f02

Branch: refs/heads/trunk
Commit: ff557f02ac628edbe220ea69888d39de834527d3
Parents: d5c821c
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jul 28 11:15:45 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jul 28 11:18:25 2016 +0100

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 325 ++++++++++---------
 .../apache/kafka/common/record/Compressor.java  |   4 +-
 .../common/record/InvalidRecordException.java   |   4 +-
 .../clients/consumer/internals/FetcherTest.java | 153 +++++++--
 4 files changed, 310 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ddfb584..c811a03 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -38,8 +39,10 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
@@ -59,7 +62,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -83,13 +85,11 @@ public class Fetcher<K, V> {
     private final Metadata metadata;
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
-    private final List<PartitionRecords<K, V>> records;
+    private final List<CompletedFetch> completedFetches;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
-    private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
-    private final Set<String> unauthorizedTopics;
-    private final Map<TopicPartition, Long> recordTooLargePartitions;
+    private PartitionRecords<K, V> nextInLineRecords = null;
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
@@ -105,7 +105,6 @@ public class Fetcher<K, V> {
                    String metricGrpPrefix,
                    Time time,
                    long retryBackoffMs) {
-
         this.time = time;
         this.client = client;
         this.metadata = metadata;
@@ -115,31 +114,37 @@ public class Fetcher<K, V> {
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
         this.checkCrcs = checkCrcs;
-
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
-
-        this.records = new LinkedList<>();
-        this.offsetOutOfRangePartitions = new HashMap<>();
-        this.unauthorizedTopics = new HashSet<>();
-        this.recordTooLargePartitions = new HashMap<>();
-
+        this.completedFetches = new ArrayList<>();
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
     }
 
     /**
-     * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
-     *
+     * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
+     * an in-flight fetch or pending fetch data.
      */
     public void sendFetches() {
         for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
-            final FetchRequest fetch = fetchEntry.getValue();
-            client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
+            final FetchRequest request = fetchEntry.getValue();
+            client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
-                        public void onSuccess(ClientResponse response) {
-                            handleFetchResponse(response, fetch);
+                        public void onSuccess(ClientResponse resp) {
+                            FetchResponse response = new FetchResponse(resp.responseBody());
+                            Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
+                            FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
+
+                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+                                TopicPartition partition = entry.getKey();
+                                long fetchOffset = request.fetchData().get(partition).offset;
+                                FetchResponse.PartitionData fetchData = entry.getValue();
+                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
+                            }
+
+                            sensors.fetchLatency.record(resp.requestLatencyMs());
+                            sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                         }
 
                         @Override
@@ -152,7 +157,7 @@ public class Fetcher<K, V> {
 
     /**
      * Update the fetch positions for the provided partitions.
-     * @param partitions
+     * @param partitions the partitions to update positions for
      * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
      */
     public void updateFetchPositions(Set<TopicPartition> partitions) {
@@ -324,62 +329,6 @@ public class Fetcher<K, V> {
     }
 
     /**
-     * If any partition from previous fetchResponse contains OffsetOutOfRange error and
-     * the defaultResetPolicy is NONE, throw OffsetOutOfRangeException
-     *
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
-     */
-    private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
-        Map<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<>();
-
-        // filter offsetOutOfRangePartitions to retain only the fetchable partitions
-        for (Map.Entry<TopicPartition, Long> entry: this.offsetOutOfRangePartitions.entrySet()) {
-            if (!subscriptions.isFetchable(entry.getKey())) {
-                log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey());
-                continue;
-            }
-            Long position = subscriptions.position(entry.getKey());
-            // ignore partition if the current position != the offset in fetchResponse, e.g. after seek()
-            if (position != null && entry.getValue().equals(position))
-                currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
-        }
-        this.offsetOutOfRangePartitions.clear();
-        if (!currentOutOfRangePartitions.isEmpty())
-            throw new OffsetOutOfRangeException(currentOutOfRangePartitions);
-    }
-
-    /**
-     * If any topic from previous fetchResponse contains an Authorization error, raise an exception
-     * @throws TopicAuthorizationException
-     */
-    private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
-        if (!unauthorizedTopics.isEmpty()) {
-            Set<String> topics = new HashSet<>(unauthorizedTopics);
-            unauthorizedTopics.clear();
-            throw new TopicAuthorizationException(topics);
-        }
-    }
-
-    /**
-     * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
-     *
-     * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
-     */
-    private void throwIfRecordTooLarge() throws RecordTooLargeException {
-        Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions);
-        this.recordTooLargePartitions.clear();
-
-        if (!copiedRecordTooLargePartitions.isEmpty())
-            throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
-                + copiedRecordTooLargePartitions
-                + " whose size is larger than the fetch size "
-                + this.fetchSize
-                + " and hence cannot be ever returned."
-                + " Increase the fetch size, or decrease the maximum message size the broker will allow.",
-                copiedRecordTooLargePartitions);
-    }
-
-    /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
      * NOTE: returning empty records guarantees the consumed position are NOT updated.
@@ -393,60 +342,68 @@ public class Fetcher<K, V> {
             return Collections.emptyMap();
         } else {
             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
-            throwIfOffsetOutOfRange();
-            throwIfUnauthorizedTopics();
-            throwIfRecordTooLarge();
-
-            int maxRecords = maxPollRecords;
-            Iterator<PartitionRecords<K, V>> iterator = records.iterator();
-            while (iterator.hasNext() && maxRecords > 0) {
-                PartitionRecords<K, V> part = iterator.next();
-                maxRecords -= append(drained, part, maxRecords);
-                if (part.isConsumed())
-                    iterator.remove();
+            int recordsRemaining = maxPollRecords;
+            Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();
+
+            while (recordsRemaining > 0) {
+                if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+                    if (!completedFetchesIterator.hasNext())
+                        break;
+
+                    CompletedFetch completion = completedFetchesIterator.next();
+                    completedFetchesIterator.remove();
+                    nextInLineRecords = parseFetchedData(completion);
+                } else {
+                    recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
+                }
             }
+
             return drained;
         }
     }
 
     private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
-                       PartitionRecords<K, V> part,
+                       PartitionRecords<K, V> partitionRecords,
                        int maxRecords) {
-        if (!subscriptions.isAssigned(part.partition)) {
+        if (partitionRecords.isEmpty())
+            return 0;
+
+        if (!subscriptions.isAssigned(partitionRecords.partition)) {
             // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
-            log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
+            log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
         } else {
             // note that the consumed position should always be available as long as the partition is still assigned
-            long position = subscriptions.position(part.partition);
-            if (!subscriptions.isFetchable(part.partition)) {
+            long position = subscriptions.position(partitionRecords.partition);
+            if (!subscriptions.isFetchable(partitionRecords.partition)) {
                 // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
-                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
-            } else if (part.fetchOffset == position) {
-                List<ConsumerRecord<K, V>> partRecords = part.take(maxRecords);
+                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
+            } else if (partitionRecords.fetchOffset == position) {
+                // we are ensured to have at least one record since we already checked for emptiness
+                List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
                 long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
 
                 log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
-                        "position to {}", position, part.partition, nextOffset);
+                        "position to {}", position, partitionRecords.partition, nextOffset);
 
-                List<ConsumerRecord<K, V>> records = drained.get(part.partition);
+                List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
                 if (records == null) {
                     records = partRecords;
-                    drained.put(part.partition, records);
+                    drained.put(partitionRecords.partition, records);
                 } else {
                     records.addAll(partRecords);
                 }
 
-                subscriptions.position(part.partition, nextOffset);
+                subscriptions.position(partitionRecords.partition, nextOffset);
                 return partRecords.size();
             } else {
                 // these records aren't next in line based on the last consumed position, ignore them
                 // they must be from an obsolete request
                 log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
-                        part.partition, part.fetchOffset, position);
+                        partitionRecords.partition, partitionRecords.fetchOffset, position);
             }
         }
 
-        part.discard();
+        partitionRecords.discard();
         return 0;
     }
 
@@ -513,10 +470,10 @@ public class Fetcher<K, V> {
 
     private Set<TopicPartition> fetchablePartitions() {
         Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
-        if (records.isEmpty())
-            return fetchable;
-        for (PartitionRecords<K, V> partitionRecords : records)
-            fetchable.remove(partitionRecords.partition);
+        if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
+            fetchable.remove(nextInLineRecords.partition);
+        for (CompletedFetch completedFetch : completedFetches)
+            fetchable.remove(completedFetch.partition);
         return fetchable;
     }
 
@@ -559,30 +516,29 @@ public class Fetcher<K, V> {
     /**
      * The callback for fetch completion
      */
-    private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
-        int totalBytes = 0;
-        int totalCount = 0;
-        FetchResponse response = new FetchResponse(resp.responseBody());
-        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
-            TopicPartition tp = entry.getKey();
-            FetchResponse.PartitionData partition = entry.getValue();
+    private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
+        TopicPartition tp = completedFetch.partition;
+        FetchResponse.PartitionData partition = completedFetch.partitionData;
+        long fetchOffset = completedFetch.fetchedOffset;
+        int bytes = 0;
+        int recordsCount = 0;
+        PartitionRecords<K, V> parsedRecords = null;
+
+        try {
             if (!subscriptions.isFetchable(tp)) {
                 // this can happen when a rebalance happened or a partition consumption paused
                 // while fetch is still in-flight
                 log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
             } else if (partition.errorCode == Errors.NONE.code()) {
-                long fetchOffset = request.fetchData().get(tp).offset;
-
                 // we are interested in this fetch only if the beginning offset matches the
                 // current consumed position
                 Long position = subscriptions.position(tp);
                 if (position == null || position != fetchOffset) {
-                    log.debug("Discarding fetch response for partition {} since its offset {} does not match " +
+                    log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
                             "the expected offset {}", tp, fetchOffset, position);
-                    continue;
+                    return null;
                 }
 
-                int bytes = 0;
                 ByteBuffer buffer = partition.recordSet;
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
@@ -597,79 +553,95 @@ public class Fetcher<K, V> {
                     }
                 }
 
+                recordsCount = parsed.size();
+                this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
+
                 if (!parsed.isEmpty()) {
                     log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
+                    parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
-                    this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
                 } else if (buffer.limit() > 0 && !skippedRecords) {
                     // we did not read a single message from a non-empty buffer
                     // because that message's size is larger than fetch size, in this case
                     // record this exception
-                    this.recordTooLargePartitions.put(tp, fetchOffset);
+                    Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
+                    throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
+                            + recordTooLargePartitions
+                            + " whose size is larger than the fetch size "
+                            + this.fetchSize
+                            + " and hence cannot be ever returned."
+                            + " Increase the fetch size on the client (using max.partition.fetch.bytes),"
+                            + " or decrease the maximum message size the broker will allow (using message.max.bytes).",
+                            recordTooLargePartitions);
                 }
-
-                this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
-                totalBytes += bytes;
-                totalCount += parsed.size();
             } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+                    || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                 this.metadata.requestUpdate();
             } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
-                long fetchOffset = request.fetchData().get(tp).offset;
-                if (subscriptions.hasDefaultOffsetResetPolicy())
+                if (fetchOffset != subscriptions.position(tp)) {
+                    log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
+                            "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                    log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
                     subscriptions.needOffsetReset(tp);
-                else
-                    this.offsetOutOfRangePartitions.put(tp, fetchOffset);
-                log.info("Fetch offset {} is out of range, resetting offset", fetchOffset);
+                } else {
+                    throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
+                }
             } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
                 log.warn("Not authorized to read from topic {}.", tp.topic());
-                unauthorizedTopics.add(tp.topic());
+                throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
             } else if (partition.errorCode == Errors.UNKNOWN.code()) {
                 log.warn("Unknown error fetching data for topic-partition {}", tp);
             } else {
                 throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
             }
+        } finally {
+            completedFetch.metricAggregator.record(tp, bytes, recordsCount);
         }
-        this.sensors.bytesFetched.record(totalBytes);
-        this.sensors.recordsFetched.record(totalCount);
-        this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
-        this.sensors.fetchLatency.record(resp.requestLatencyMs());
+
+        return parsedRecords;
     }
 
     /**
      * Parse the record entry, deserializing the key / value fields if necessary
      */
     private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
+        Record record = logEntry.record();
+
+        if (this.checkCrcs && !record.isValid())
+            throw new InvalidRecordException("Record for partition " + partition + " at offset "
+                    + logEntry.offset() + " is corrupt (stored crc = " + record.checksum()
+                    + ", computed crc = "
+                    + record.computeChecksum()
+                    + ")");
+
         try {
-            if (this.checkCrcs)
-                logEntry.record().ensureValid();
             long offset = logEntry.offset();
-            long timestamp = logEntry.record().timestamp();
-            TimestampType timestampType = logEntry.record().timestampType();
-            ByteBuffer keyBytes = logEntry.record().key();
+            long timestamp = record.timestamp();
+            TimestampType timestampType = record.timestampType();
+            ByteBuffer keyBytes = record.key();
             byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
             K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
-            ByteBuffer valueBytes = logEntry.record().value();
+            ByteBuffer valueBytes = record.value();
             byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
             V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
 
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType, logEntry.record().checksum(),
+                                        timestamp, timestampType, record.checksum(),
                                         keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                         valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                         key, value);
-        } catch (KafkaException e) {
-            throw e;
         } catch (RuntimeException e) {
-            throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
+            throw new SerializationException("Error deserializing key/value for partition " + partition +
+                    " at offset " + logEntry.offset(), e);
         }
     }
 
     private static class PartitionRecords<K, V> {
-        public long fetchOffset;
-        public TopicPartition partition;
-        public List<ConsumerRecord<K, V>> records;
+        private long fetchOffset;
+        private TopicPartition partition;
+        private List<ConsumerRecord<K, V>> records;
 
         public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
             this.fetchOffset = fetchOffset;
@@ -677,7 +649,7 @@ public class Fetcher<K, V> {
             this.records = records;
         }
 
-        private boolean isConsumed() {
+        private boolean isEmpty() {
             return records == null || records.isEmpty();
         }
 
@@ -687,7 +659,7 @@ public class Fetcher<K, V> {
 
         private List<ConsumerRecord<K, V>> take(int n) {
             if (records == null)
-                return Collections.emptyList();
+                return new ArrayList<>();
 
             if (n >= records.size()) {
                 List<ConsumerRecord<K, V>> res = this.records;
@@ -709,7 +681,59 @@ public class Fetcher<K, V> {
         }
     }
 
-    private class FetchManagerMetrics {
+    private static class CompletedFetch {
+        private final TopicPartition partition;
+        private final long fetchedOffset;
+        private final FetchResponse.PartitionData partitionData;
+        private final FetchResponseMetricAggregator metricAggregator;
+
+        public CompletedFetch(TopicPartition partition,
+                              long fetchedOffset,
+                              FetchResponse.PartitionData partitionData,
+                              FetchResponseMetricAggregator metricAggregator) {
+            this.partition = partition;
+            this.fetchedOffset = fetchedOffset;
+            this.partitionData = partitionData;
+            this.metricAggregator = metricAggregator;
+        }
+    }
+
+    /**
+     * Since we parse the message data for each partition from each fetch response lazily, fetch-level
+     * metrics need to be aggregated as the messages from each partition are parsed. This class is used
+     * to facilitate this incremental aggregation.
+     */
+    private static class FetchResponseMetricAggregator {
+        private final FetchManagerMetrics sensors;
+        private final Set<TopicPartition> unrecordedPartitions;
+
+        private int totalBytes;
+        private int totalRecords;
+
+        public FetchResponseMetricAggregator(FetchManagerMetrics sensors,
+                                             Set<TopicPartition> partitions) {
+            this.sensors = sensors;
+            this.unrecordedPartitions = partitions;
+        }
+
+        /**
+         * After each partition is parsed, we update the current metric totals with the total bytes
+         * and number of records parsed. After all partitions have reported, we write the metric.
+         */
+        public void record(TopicPartition partition, int bytes, int records) {
+            unrecordedPartitions.remove(partition);
+            totalBytes += bytes;
+            totalRecords += records;
+
+            if (unrecordedPartitions.isEmpty()) {
+                // once all expected partitions from the fetch have reported in, record the metrics
+                sensors.bytesFetched.record(totalBytes);
+                sensors.recordsFetched.record(totalRecords);
+            }
+        }
+    }
+
+    private static class FetchManagerMetrics {
         public final Metrics metrics;
         public final String metricGrpName;
 
@@ -719,7 +743,6 @@ public class Fetcher<K, V> {
         public final Sensor recordsFetchLag;
         public final Sensor fetchThrottleTimeSensor;
 
-
         public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
             this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index e23a52e..a806975 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -242,7 +242,7 @@ public class Compressor {
 
     // the following two functions also need to be public since they are used in MemoryRecords.iteration
 
-    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
+    public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
         try {
             switch (type) {
                 case NONE:
@@ -271,7 +271,7 @@ public class Compressor {
         }
     }
 
-    static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
+    public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
         try {
             switch (type) {
                 case NONE:

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
index 5815b21..a1009ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.common.record;
 
-public class InvalidRecordException extends RuntimeException {
+import org.apache.kafka.common.KafkaException;
+
+public class InvalidRecordException extends KafkaException {
 
     private static final long serialVersionUID = 1;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 8fad30f..2fbd43e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -38,6 +39,8 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Compressor;
+import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -47,6 +50,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -128,7 +132,7 @@ public class FetcherTest {
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(3, records.size());
-        assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching position
+        assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching position
         long offset = 1;
         for (ConsumerRecord<byte[], byte[]> record : records) {
             assertEquals(offset, record.offset());
@@ -148,8 +152,82 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetchedRecordsRaisesOnSerializationErrors() {
+        // raise an exception from somewhere in the middle of the fetch response
+        // so that we can verify that our position does not advance after raising
+        ByteArrayDeserializer deserializer = new ByteArrayDeserializer() {
+            int i = 0;
+            @Override
+            public byte[] deserialize(String topic, byte[] data) {
+                if (i++ == 1)
+                    throw new SerializationException();
+                return data;
+            }
+        };
+
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
+
+        subscriptions.assignFromUser(Collections.singleton(tp));
+        subscriptions.seek(tp, 1);
+
+        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+
+        fetcher.sendFetches();
+        consumerClient.poll(0);
+        try {
+            fetcher.fetchedRecords();
+            fail("fetchedRecords should have raised");
+        } catch (SerializationException e) {
+            // the position should not advance since no data has been returned
+            assertEquals(1, subscriptions.position(tp).longValue());
+        }
+    }
+
+    @Test
+    public void testParseInvalidRecord() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        Compressor compressor = new Compressor(buffer, CompressionType.NONE);
+
+        byte[] key = "foo".getBytes();
+        byte[] value = "baz".getBytes();
+        long offset = 0;
+        long timestamp = 500L;
+
+        int size = Record.recordSize(key, value);
+        long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1);
+
+        // write one valid record
+        compressor.putLong(offset);
+        compressor.putInt(size);
+        Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+
+        // and one invalid record (note the crc)
+        compressor.putLong(offset);
+        compressor.putInt(size);
+        Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+
+        compressor.close();
+        buffer.flip();
+
+        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.seek(tp, 0);
+
+        // normal fetch
+        fetcher.sendFetches();
+        client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0));
+        consumerClient.poll(0);
+        try {
+            fetcher.fetchedRecords();
+            fail("fetchedRecords should have raised");
+        } catch (InvalidRecordException e) {
+            // the position should not advance since no data has been returned
+            assertEquals(0, subscriptions.position(tp).longValue());
+        }
+    }
+
+    @Test
     public void testFetchMaxPollRecords() {
-        Fetcher<byte[], byte[]> fetcher = createFetcher(2, subscriptions, new Metrics(time));
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
 
         List<ConsumerRecord<byte[], byte[]>> records;
         subscriptions.assignFromUser(Arrays.asList(tp));
@@ -162,7 +240,7 @@ public class FetcherTest {
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(2, records.size());
-        assertEquals(3L, (long) subscriptions.position(tp));
+        assertEquals(3L, subscriptions.position(tp).longValue());
         assertEquals(1, records.get(0).offset());
         assertEquals(2, records.get(1).offset());
 
@@ -170,14 +248,14 @@ public class FetcherTest {
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(1, records.size());
-        assertEquals(4L, (long) subscriptions.position(tp));
+        assertEquals(4L, subscriptions.position(tp).longValue());
         assertEquals(3, records.get(0).offset());
 
         fetcher.sendFetches();
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(2, records.size());
-        assertEquals(6L, (long) subscriptions.position(tp));
+        assertEquals(6L, subscriptions.position(tp).longValue());
         assertEquals(4, records.get(0).offset());
         assertEquals(5, records.get(1).offset());
     }
@@ -203,7 +281,7 @@ public class FetcherTest {
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp);
         assertEquals(3, consumerRecords.size());
-        assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching position
+        assertEquals(31L, subscriptions.position(tp).longValue()); // this is the next fetching position
 
         assertEquals(15L, consumerRecords.get(0).offset());
         assertEquals(20L, consumerRecords.get(1).offset());
@@ -318,12 +396,28 @@ public class FetcherTest {
         fetcher.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
-        assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
+        assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(null, subscriptions.position(tp));
     }
 
     @Test
+    public void testStaleOutOfRangeError() {
+        // verify that an out of range error which arrives after a seek
+        // does not cause us to reset our position or throw an exception
+        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.seek(tp, 0);
+
+        fetcher.sendFetches();
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        subscriptions.seek(tp, 1);
+        consumerClient.poll(0);
+        assertEquals(0, fetcher.fetchedRecords().size());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertEquals(1, subscriptions.position(tp).longValue());
+    }
+
+    @Test
     public void testFetchedRecordsAfterSeek() {
         subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
@@ -368,7 +462,7 @@ public class FetcherTest {
         // disconnects should have no affect on subscription state
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(0, (long) subscriptions.position(tp));
+        assertEquals(0, subscriptions.position(tp).longValue());
     }
 
     @Test
@@ -380,7 +474,7 @@ public class FetcherTest {
 
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.position(tp));
+        assertEquals(5, subscriptions.position(tp).longValue());
     }
 
     @Test
@@ -393,7 +487,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.position(tp));
+        assertEquals(5, subscriptions.position(tp).longValue());
     }
 
     @Test
@@ -406,7 +500,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.position(tp));
+        assertEquals(5, subscriptions.position(tp).longValue());
     }
 
     @Test
@@ -419,7 +513,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.position(tp));
+        assertEquals(5, subscriptions.position(tp).longValue());
     }
 
     @Test
@@ -437,7 +531,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.position(tp));
+        assertEquals(5, subscriptions.position(tp).longValue());
     }
 
     @Test
@@ -575,17 +669,36 @@ public class FetcherTest {
         return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
     }
 
-    private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
-                                                  SubscriptionState subscriptions,
-                                                  Metrics metrics) {
+    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions,
+                                                  Metrics metrics,
+                                                  int maxPollRecords) {
+        return createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), maxPollRecords);
+    }
+
+    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
+        return createFetcher(subscriptions, metrics, Integer.MAX_VALUE);
+    }
+
+    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions,
+                                               Metrics metrics,
+                                               Deserializer<K> keyDeserializer,
+                                               Deserializer<V> valueDeserializer) {
+        return createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE);
+    }
+
+    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions,
+                                               Metrics metrics,
+                                               Deserializer<K> keyDeserializer,
+                                               Deserializer<V> valueDeserializer,
+                                               int maxPollRecords) {
         return new Fetcher<>(consumerClient,
                 minBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,
                 true, // check crc
-                new ByteArrayDeserializer(),
-                new ByteArrayDeserializer(),
+                keyDeserializer,
+                valueDeserializer,
                 metadata,
                 subscriptions,
                 metrics,
@@ -594,8 +707,4 @@ public class FetcherTest {
                 retryBackoffMs);
     }
 
-
-    private  Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
-        return createFetcher(Integer.MAX_VALUE, subscriptions, metrics);
-    }
 }