You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/29 01:20:04 UTC

kafka git commit: MINOR: Support streaming decompression of fetched records for new format

Repository: kafka
Updated Branches:
  refs/heads/trunk edb372dca -> a0b8e435c


MINOR: Support streaming decompression of fetched records for new format

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #2738 from hachikuji/streaming-compressed-iterator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0b8e435
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0b8e435
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0b8e435

Branch: refs/heads/trunk
Commit: a0b8e435c9419a9402d08408260bea0c1d95cff0
Parents: edb372d
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Mar 28 17:47:10 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Mar 28 17:47:10 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 222 ++++++++++---------
 .../kafka/common/record/DefaultRecordBatch.java | 100 +++++----
 .../clients/consumer/internals/FetcherTest.java |   6 +-
 3 files changed, 175 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c2456cc..ad63d25 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -66,6 +66,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -97,7 +98,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
-    private PartitionRecords<K, V> nextInLineRecords = null;
+    private PartitionRecords nextInLineRecords = null;
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
@@ -464,11 +465,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
      *         the defaultResetPolicy is NONE
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
-        Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
         int recordsRemaining = maxPollRecords;
 
         while (recordsRemaining > 0) {
-            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
+            if (nextInLineRecords == null || nextInLineRecords.isFetched) {
                 CompletedFetch completedFetch = completedFetches.poll();
                 if (completedFetch == null)
                     break;
@@ -476,11 +477,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 nextInLineRecords = parseCompletedFetch(completedFetch);
             } else {
                 TopicPartition partition = nextInLineRecords.partition;
-                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
+                List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
                 if (!records.isEmpty()) {
-                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
+                    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                     if (currentRecords == null) {
-                        drained.put(partition, records);
+                        fetched.put(partition, records);
                     } else {
                         // this case shouldn't usually happen because we only send one fetch at a time per partition,
                         // but it might conceivably happen in some rare cases (such as partition leader changes).
@@ -488,35 +489,35 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                         List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                         newRecords.addAll(currentRecords);
                         newRecords.addAll(records);
-                        drained.put(partition, newRecords);
+                        fetched.put(partition, newRecords);
                     }
                     recordsRemaining -= records.size();
                 }
             }
         }
 
-        return drained;
+        return fetched;
     }
 
-    private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
+    private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
         if (!subscriptions.isAssigned(partitionRecords.partition)) {
             // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
-            log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
+            log.debug("Not returning fetched records for partition {} since it is no longer assigned",
+                    partitionRecords.partition);
         } else {
             // note that the consumed position should always be available as long as the partition is still assigned
             long position = subscriptions.position(partitionRecords.partition);
             if (!subscriptions.isFetchable(partitionRecords.partition)) {
                 // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
-                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
-            } else if (partitionRecords.fetchOffset == position) {
-                List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
-                if (!partRecords.isEmpty()) {
-                    long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
-                    log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
-                            "position to {}", position, partitionRecords.partition, nextOffset);
-
-                    subscriptions.position(partitionRecords.partition, nextOffset);
-                }
+                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
+                        partitionRecords.partition);
+            } else if (partitionRecords.nextFetchOffset == position) {
+                List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
+
+                long nextOffset = partitionRecords.nextFetchOffset;
+                log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
+                        "position to {}", position, partitionRecords.partition, nextOffset);
+                subscriptions.position(partitionRecords.partition, nextOffset);
 
                 Long partitionLag = subscriptions.partitionLag(partitionRecords.partition);
                 if (partitionLag != null)
@@ -527,7 +528,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 // these records aren't next in line based on the last consumed position, ignore them
                 // they must be from an obsolete request
                 log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
-                        partitionRecords.partition, partitionRecords.fetchOffset, position);
+                        partitionRecords.partition, partitionRecords.nextFetchOffset, position);
             }
         }
 
@@ -691,7 +692,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
     private List<TopicPartition> fetchablePartitions() {
         Set<TopicPartition> exclude = new HashSet<>();
         List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
-        if (nextInLineRecords != null && !nextInLineRecords.isDrained()) {
+        if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
             exclude.add(nextInLineRecords.partition);
         }
         for (CompletedFetch completedFetch : completedFetches) {
@@ -743,13 +744,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
     /**
      * The callback for fetch completion
      */
-    private PartitionRecords<K, V> parseCompletedFetch(CompletedFetch completedFetch) {
+    private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
         TopicPartition tp = completedFetch.partition;
         FetchResponse.PartitionData partition = completedFetch.partitionData;
         long fetchOffset = completedFetch.fetchedOffset;
-        int bytes = 0;
-        int recordsCount = 0;
-        PartitionRecords<K, V> parsedRecords = null;
+        PartitionRecords parsedRecords = null;
         Errors error = partition.error;
 
         try {
@@ -767,36 +766,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                     return null;
                 }
 
-                List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
-                boolean skippedRecords = false;
-                for (RecordBatch batch : partition.records.batches()) {
-                    if (this.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
-                        try {
-                            batch.ensureValid();
-                        } catch (InvalidRecordException e) {
-                            throw new KafkaException("Record batch for partition " + partition + " at offset " +
-                                    batch.baseOffset() + " is invalid, cause: " + e.getMessage());
-                        }
-                    }
-
-                    for (Record record : batch) {
-                        // control records should not be returned to the user. also skip anything out of range
-                        if (record.isControlRecord() || record.offset() < position) {
-                            skippedRecords = true;
-                        } else {
-                            parsed.add(parseRecord(tp, batch, record));
-                            bytes += record.sizeInBytes();
-                        }
-                    }
-                }
-
-                recordsCount = parsed.size();
-
-                log.trace("Adding {} fetched record(s) for partition {} with offset {} to buffered record list",
-                        parsed.size(), tp, position);
-                parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
+                log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
+                        partition.records.sizeInBytes(), tp, position);
+                Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
+                parsedRecords = new PartitionRecords(tp, completedFetch, batches);
 
-                if (parsed.isEmpty() && !skippedRecords && partition.records.sizeInBytes() > 0) {
+                if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
                     if (completedFetch.responseVersion < 3) {
                         // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
                         Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
@@ -815,7 +790,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 }
 
                 if (partition.highWatermark >= 0) {
-                    log.trace("Received {} records in fetch response for partition {} with offset {}", parsed.size(), tp, position);
+                    log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
                     subscriptions.updateHighWatermark(tp, partition.highWatermark);
                 }
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
@@ -844,29 +819,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
             }
         } finally {
-            completedFetch.metricAggregator.record(tp, bytes, recordsCount);
+            if (error != Errors.NONE) {
+                completedFetch.metricAggregator.record(tp, 0, 0);
+                // we move the partition to the end if there was an error. This way, it's more likely that partitions for
+                // the same topic can remain together (allowing for more efficient serialization).
+                subscriptions.movePartitionToEnd(tp);
+            }
         }
 
-        // we move the partition to the end if we received some bytes or if there was an error. This way, it's more
-        // likely that partitions for the same topic can remain together (allowing for more efficient serialization).
-        if (bytes > 0 || error != Errors.NONE)
-            subscriptions.movePartitionToEnd(tp);
-
         return parsedRecords;
     }
 
-    private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
-                                             RecordBatch batch,
-                                             Record record) {
-        if (this.checkCrcs) {
-            try {
-                record.ensureValid();
-            } catch (InvalidRecordException e) {
-                throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
-                        + " is invalid, cause: " + e.getMessage());
-            }
-        }
-
+    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
         try {
             long offset = record.offset();
             long timestamp = record.timestamp();
@@ -894,42 +858,102 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
         sensors.updatePartitionLagSensors(assignment);
     }
 
-    private static class PartitionRecords<K, V> {
-        private long fetchOffset;
-        private TopicPartition partition;
-        private List<ConsumerRecord<K, V>> records;
-        private int position = 0;
-
-        private PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
-            this.fetchOffset = fetchOffset;
+    private class PartitionRecords {
+        private final TopicPartition partition;
+        private final CompletedFetch completedFetch;
+        private final Iterator<? extends RecordBatch> batches;
+
+        private int recordsRead;
+        private int bytesRead;
+        private RecordBatch currentBatch;
+        private Iterator<Record> records;
+        private long nextFetchOffset;
+        private boolean isFetched = false;
+
+        private PartitionRecords(TopicPartition partition,
+                                 CompletedFetch completedFetch,
+                                 Iterator<? extends RecordBatch> batches) {
             this.partition = partition;
-            this.records = records;
+            this.completedFetch = completedFetch;
+            this.batches = batches;
+            this.nextFetchOffset = completedFetch.fetchedOffset;
         }
 
-        private boolean isDrained() {
-            return records == null;
+        private void drain() {
+            if (!isFetched) {
+                this.isFetched = true;
+                this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
+
+                // we move the partition to the end if we received some bytes. This way, it's more likely that partitions
+                // for the same topic can remain together (allowing for more efficient serialization).
+                if (bytesRead > 0)
+                    subscriptions.movePartitionToEnd(partition);
+            }
         }
 
-        private void drain() {
-            this.records = null;
+        private void maybeEnsureValid(RecordBatch batch) {
+            if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+                try {
+                    batch.ensureValid();
+                } catch (InvalidRecordException e) {
+                    throw new KafkaException("Record batch for partition " + partition + " at offset " +
+                            batch.baseOffset() + " is invalid, cause: " + e.getMessage());
+                }
+            }
         }
 
-        private List<ConsumerRecord<K, V>> drainRecords(int n) {
-            if (isDrained() || position >= records.size()) {
-                drain();
-                return Collections.emptyList();
+        private void maybeEnsureValid(Record record) {
+            if (checkCrcs) {
+                try {
+                    record.ensureValid();
+                } catch (InvalidRecordException e) {
+                    throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
+                            + " is invalid, cause: " + e.getMessage());
+                }
+            }
+        }
+
+        private Record nextFetchedRecord() {
+            while (true) {
+                if (records == null || !records.hasNext()) {
+                    if (!batches.hasNext()) {
+                        drain();
+                        return null;
+                    }
+                    currentBatch = batches.next();
+                    maybeEnsureValid(currentBatch);
+                    records = currentBatch.iterator();
+                }
+
+                Record record = records.next();
+                maybeEnsureValid(record);
+
+                // skip any records out of range
+                if (record.offset() >= nextFetchOffset) {
+                    nextFetchOffset = record.offset() + 1;
+
+                    // control records are not returned to the user
+                    if (!record.isControlRecord())
+                         return record;
+                }
             }
+        }
 
-            // using a sublist avoids a potentially expensive list copy (depending on the size of the records
-            // and the maximum we can return from poll). The cost is that we cannot mutate the returned sublist.
-            int limit = Math.min(records.size(), position + n);
-            List<ConsumerRecord<K, V>> res = Collections.unmodifiableList(records.subList(position, limit));
+        private List<ConsumerRecord<K, V>> fetchRecords(int n) {
+            if (isFetched)
+                return Collections.emptyList();
 
-            position = limit;
-            if (position < records.size())
-                fetchOffset = records.get(position).offset();
+            List<ConsumerRecord<K, V>> records = new ArrayList<>();
+            for (int i = 0; i < n; i++) {
+                Record record = nextFetchedRecord();
+                if (record == null)
+                    break;
 
-            return res;
+                recordsRead++;
+                bytesRead += record.sizeInBytes();
+                records.add(parseRecord(partition, currentBatch, record));
+            }
+            return records;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 1de568e..8f50a2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -20,14 +20,11 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Crc32;
-import org.apache.kafka.common.utils.Utils;
 
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 
@@ -205,62 +202,29 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     private Iterator<Record> compressedIterator() {
         ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
+        final DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
                 new ByteBufferInputStream(buffer), magic()));
 
-        // TODO: An improvement for the consumer would be to only decompress the records
-        // we need to fill max.poll.records and leave the rest compressed.
-        int numRecords = count();
-        if (numRecords < 0)
-            throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" +
-                    magic() + " batch");
-
-        List<Record> records = new ArrayList<>(numRecords);
-        try {
-            Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
-            long baseOffset = baseOffset();
-            long baseTimestamp = baseTimestamp();
-            int baseSequence = baseSequence();
-
-            for (int i = 0; i < numRecords; i++)
-                records.add(DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime));
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        } finally {
-            Utils.closeQuietly(stream, "records iterator stream");
-        }
-
-        return records.iterator();
+        return new RecordIterator() {
+            @Override
+            protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
+                try {
+                    return DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+                } catch (IOException e) {
+                    throw new KafkaException("Failed to decompress record stream", e);
+                }
+            }
+        };
     }
 
     private Iterator<Record> uncompressedIterator() {
         final ByteBuffer buffer = this.buffer.duplicate();
-        final Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
-        final long baseOffset = baseOffset();
-        final long baseTimestamp = baseTimestamp();
-        final int baseSequence = baseSequence();
-
         buffer.position(RECORDS_OFFSET);
-        final int totalRecords = count();
-
-        return new Iterator<Record>() {
-            int readRecords = 0;
-
-            @Override
-            public boolean hasNext() {
-                return readRecords < totalRecords;
-            }
-
+        return new RecordIterator() {
             @Override
-            public Record next() {
-                readRecords++;
+            protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
                 return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
             }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
         };
     }
 
@@ -432,4 +396,42 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
     }
 
+    private abstract class RecordIterator implements Iterator<Record> {
+        private final Long logAppendTime;
+        private final long baseOffset;
+        private final long baseTimestamp;
+        private final int baseSequence;
+        private final int numRecords;
+        private int readRecords = 0;
+
+        public RecordIterator() {
+            this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
+            this.baseOffset = baseOffset();
+            this.baseTimestamp = baseTimestamp();
+            this.baseSequence = baseSequence();
+            int numRecords = count();
+            if (numRecords < 0)
+                throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" +
+                        magic() + " batch");
+            this.numRecords = numRecords;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return readRecords < numRecords;
+        }
+
+        @Override
+        public Record next() {
+            readRecords++;
+            return readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
+        }
+
+        protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 224f83c..b03b461 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -191,11 +191,7 @@ public class FetcherTest {
 
         List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
         assertEquals(2, records.size());
-
-        // TODO: currently the offset does not advance beyond the control record until a record
-        // with a larger offset is fetched. In the worst case, we may fetch the control record
-        // again after a rebalance, but that should be fine since we just discard it anyway
-        assertEquals(3L, subscriptions.position(tp).longValue());
+        assertEquals(4L, subscriptions.position(tp).longValue());
         for (ConsumerRecord<byte[], byte[]> record : records)
             assertArrayEquals("key".getBytes(), record.key());
     }