You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/29 01:20:04 UTC
kafka git commit: MINOR: Support streaming decompression of fetched
records for new format
Repository: kafka
Updated Branches:
refs/heads/trunk edb372dca -> a0b8e435c
MINOR: Support streaming decompression of fetched records for new format
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #2738 from hachikuji/streaming-compressed-iterator
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0b8e435
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0b8e435
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0b8e435
Branch: refs/heads/trunk
Commit: a0b8e435c9419a9402d08408260bea0c1d95cff0
Parents: edb372d
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Mar 28 17:47:10 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Mar 28 17:47:10 2017 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 222 ++++++++++---------
.../kafka/common/record/DefaultRecordBatch.java | 100 +++++----
.../clients/consumer/internals/FetcherTest.java | 6 +-
3 files changed, 175 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c2456cc..ad63d25 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -66,6 +66,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@@ -97,7 +98,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
- private PartitionRecords<K, V> nextInLineRecords = null;
+ private PartitionRecords nextInLineRecords = null;
public Fetcher(ConsumerNetworkClient client,
int minBytes,
@@ -464,11 +465,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
* the defaultResetPolicy is NONE
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
- Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
- if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
+ if (nextInLineRecords == null || nextInLineRecords.isFetched) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
@@ -476,11 +477,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
nextInLineRecords = parseCompletedFetch(completedFetch);
} else {
TopicPartition partition = nextInLineRecords.partition;
- List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
+ List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
if (!records.isEmpty()) {
- List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
+ List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
- drained.put(partition, records);
+ fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
@@ -488,35 +489,35 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
- drained.put(partition, newRecords);
+ fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
- return drained;
+ return fetched;
}
- private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
+ private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
if (!subscriptions.isAssigned(partitionRecords.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
+ log.debug("Not returning fetched records for partition {} since it is no longer assigned",
+ partitionRecords.partition);
} else {
// note that the consumed position should always be available as long as the partition is still assigned
long position = subscriptions.position(partitionRecords.partition);
if (!subscriptions.isFetchable(partitionRecords.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
- } else if (partitionRecords.fetchOffset == position) {
- List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
- if (!partRecords.isEmpty()) {
- long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
- log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
- "position to {}", position, partitionRecords.partition, nextOffset);
-
- subscriptions.position(partitionRecords.partition, nextOffset);
- }
+ log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
+ partitionRecords.partition);
+ } else if (partitionRecords.nextFetchOffset == position) {
+ List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
+
+ long nextOffset = partitionRecords.nextFetchOffset;
+ log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
+ "position to {}", position, partitionRecords.partition, nextOffset);
+ subscriptions.position(partitionRecords.partition, nextOffset);
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition);
if (partitionLag != null)
@@ -527,7 +528,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
- partitionRecords.partition, partitionRecords.fetchOffset, position);
+ partitionRecords.partition, partitionRecords.nextFetchOffset, position);
}
}
@@ -691,7 +692,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
private List<TopicPartition> fetchablePartitions() {
Set<TopicPartition> exclude = new HashSet<>();
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
- if (nextInLineRecords != null && !nextInLineRecords.isDrained()) {
+ if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
exclude.add(nextInLineRecords.partition);
}
for (CompletedFetch completedFetch : completedFetches) {
@@ -743,13 +744,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
/**
* The callback for fetch completion
*/
- private PartitionRecords<K, V> parseCompletedFetch(CompletedFetch completedFetch) {
+ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
- int bytes = 0;
- int recordsCount = 0;
- PartitionRecords<K, V> parsedRecords = null;
+ PartitionRecords parsedRecords = null;
Errors error = partition.error;
try {
@@ -767,36 +766,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
return null;
}
- List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
- boolean skippedRecords = false;
- for (RecordBatch batch : partition.records.batches()) {
- if (this.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
- try {
- batch.ensureValid();
- } catch (InvalidRecordException e) {
- throw new KafkaException("Record batch for partition " + partition + " at offset " +
- batch.baseOffset() + " is invalid, cause: " + e.getMessage());
- }
- }
-
- for (Record record : batch) {
- // control records should not be returned to the user. also skip anything out of range
- if (record.isControlRecord() || record.offset() < position) {
- skippedRecords = true;
- } else {
- parsed.add(parseRecord(tp, batch, record));
- bytes += record.sizeInBytes();
- }
- }
- }
-
- recordsCount = parsed.size();
-
- log.trace("Adding {} fetched record(s) for partition {} with offset {} to buffered record list",
- parsed.size(), tp, position);
- parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
+ log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
+ partition.records.sizeInBytes(), tp, position);
+ Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
+ parsedRecords = new PartitionRecords(tp, completedFetch, batches);
- if (parsed.isEmpty() && !skippedRecords && partition.records.sizeInBytes() > 0) {
+ if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
if (completedFetch.responseVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
@@ -815,7 +790,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
}
if (partition.highWatermark >= 0) {
- log.trace("Received {} records in fetch response for partition {} with offset {}", parsed.size(), tp, position);
+ log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
@@ -844,29 +819,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
}
} finally {
- completedFetch.metricAggregator.record(tp, bytes, recordsCount);
+ if (error != Errors.NONE) {
+ completedFetch.metricAggregator.record(tp, 0, 0);
+ // we move the partition to the end if there was an error. This way, it's more likely that partitions for
+ // the same topic can remain together (allowing for more efficient serialization).
+ subscriptions.movePartitionToEnd(tp);
+ }
}
- // we move the partition to the end if we received some bytes or if there was an error. This way, it's more
- // likely that partitions for the same topic can remain together (allowing for more efficient serialization).
- if (bytes > 0 || error != Errors.NONE)
- subscriptions.movePartitionToEnd(tp);
-
return parsedRecords;
}
- private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
- RecordBatch batch,
- Record record) {
- if (this.checkCrcs) {
- try {
- record.ensureValid();
- } catch (InvalidRecordException e) {
- throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
- + " is invalid, cause: " + e.getMessage());
- }
- }
-
+ private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
@@ -894,42 +858,102 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
sensors.updatePartitionLagSensors(assignment);
}
- private static class PartitionRecords<K, V> {
- private long fetchOffset;
- private TopicPartition partition;
- private List<ConsumerRecord<K, V>> records;
- private int position = 0;
-
- private PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
- this.fetchOffset = fetchOffset;
+ private class PartitionRecords {
+ private final TopicPartition partition;
+ private final CompletedFetch completedFetch;
+ private final Iterator<? extends RecordBatch> batches;
+
+ private int recordsRead;
+ private int bytesRead;
+ private RecordBatch currentBatch;
+ private Iterator<Record> records;
+ private long nextFetchOffset;
+ private boolean isFetched = false;
+
+ private PartitionRecords(TopicPartition partition,
+ CompletedFetch completedFetch,
+ Iterator<? extends RecordBatch> batches) {
this.partition = partition;
- this.records = records;
+ this.completedFetch = completedFetch;
+ this.batches = batches;
+ this.nextFetchOffset = completedFetch.fetchedOffset;
}
- private boolean isDrained() {
- return records == null;
+ private void drain() {
+ if (!isFetched) {
+ this.isFetched = true;
+ this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
+
+ // we move the partition to the end if we received some bytes. This way, it's more likely that partitions
+ // for the same topic can remain together (allowing for more efficient serialization).
+ if (bytesRead > 0)
+ subscriptions.movePartitionToEnd(partition);
+ }
}
- private void drain() {
- this.records = null;
+ private void maybeEnsureValid(RecordBatch batch) {
+ if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+ try {
+ batch.ensureValid();
+ } catch (InvalidRecordException e) {
+ throw new KafkaException("Record batch for partition " + partition + " at offset " +
+ batch.baseOffset() + " is invalid, cause: " + e.getMessage());
+ }
+ }
}
- private List<ConsumerRecord<K, V>> drainRecords(int n) {
- if (isDrained() || position >= records.size()) {
- drain();
- return Collections.emptyList();
+ private void maybeEnsureValid(Record record) {
+ if (checkCrcs) {
+ try {
+ record.ensureValid();
+ } catch (InvalidRecordException e) {
+ throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
+ + " is invalid, cause: " + e.getMessage());
+ }
+ }
+ }
+
+ private Record nextFetchedRecord() {
+ while (true) {
+ if (records == null || !records.hasNext()) {
+ if (!batches.hasNext()) {
+ drain();
+ return null;
+ }
+ currentBatch = batches.next();
+ maybeEnsureValid(currentBatch);
+ records = currentBatch.iterator();
+ }
+
+ Record record = records.next();
+ maybeEnsureValid(record);
+
+ // skip any records out of range
+ if (record.offset() >= nextFetchOffset) {
+ nextFetchOffset = record.offset() + 1;
+
+ // control records are not returned to the user
+ if (!record.isControlRecord())
+ return record;
+ }
}
+ }
- // using a sublist avoids a potentially expensive list copy (depending on the size of the records
- // and the maximum we can return from poll). The cost is that we cannot mutate the returned sublist.
- int limit = Math.min(records.size(), position + n);
- List<ConsumerRecord<K, V>> res = Collections.unmodifiableList(records.subList(position, limit));
+ private List<ConsumerRecord<K, V>> fetchRecords(int n) {
+ if (isFetched)
+ return Collections.emptyList();
- position = limit;
- if (position < records.size())
- fetchOffset = records.get(position).offset();
+ List<ConsumerRecord<K, V>> records = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ Record record = nextFetchedRecord();
+ if (record == null)
+ break;
- return res;
+ recordsRead++;
+ bytesRead += record.sizeInBytes();
+ records.add(parseRecord(partition, currentBatch, record));
+ }
+ return records;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 1de568e..8f50a2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -20,14 +20,11 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Crc32;
-import org.apache.kafka.common.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@@ -205,62 +202,29 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private Iterator<Record> compressedIterator() {
ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
- DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
+ final DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
new ByteBufferInputStream(buffer), magic()));
- // TODO: An improvement for the consumer would be to only decompress the records
- // we need to fill max.poll.records and leave the rest compressed.
- int numRecords = count();
- if (numRecords < 0)
- throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" +
- magic() + " batch");
-
- List<Record> records = new ArrayList<>(numRecords);
- try {
- Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
- long baseOffset = baseOffset();
- long baseTimestamp = baseTimestamp();
- int baseSequence = baseSequence();
-
- for (int i = 0; i < numRecords; i++)
- records.add(DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime));
- } catch (IOException e) {
- throw new KafkaException(e);
- } finally {
- Utils.closeQuietly(stream, "records iterator stream");
- }
-
- return records.iterator();
+ return new RecordIterator() {
+ @Override
+ protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
+ try {
+ return DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ } catch (IOException e) {
+ throw new KafkaException("Failed to decompress record stream", e);
+ }
+ }
+ };
}
private Iterator<Record> uncompressedIterator() {
final ByteBuffer buffer = this.buffer.duplicate();
- final Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
- final long baseOffset = baseOffset();
- final long baseTimestamp = baseTimestamp();
- final int baseSequence = baseSequence();
-
buffer.position(RECORDS_OFFSET);
- final int totalRecords = count();
-
- return new Iterator<Record>() {
- int readRecords = 0;
-
- @Override
- public boolean hasNext() {
- return readRecords < totalRecords;
- }
-
+ return new RecordIterator() {
@Override
- public Record next() {
- readRecords++;
+ protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
};
}
@@ -432,4 +396,42 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}
+ private abstract class RecordIterator implements Iterator<Record> {
+ private final Long logAppendTime;
+ private final long baseOffset;
+ private final long baseTimestamp;
+ private final int baseSequence;
+ private final int numRecords;
+ private int readRecords = 0;
+
+ public RecordIterator() {
+ this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
+ this.baseOffset = baseOffset();
+ this.baseTimestamp = baseTimestamp();
+ this.baseSequence = baseSequence();
+ int numRecords = count();
+ if (numRecords < 0)
+ throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" +
+ magic() + " batch");
+ this.numRecords = numRecords;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return readRecords < numRecords;
+ }
+
+ @Override
+ public Record next() {
+ readRecords++;
+ return readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ }
+
+ protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 224f83c..b03b461 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -191,11 +191,7 @@ public class FetcherTest {
List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
assertEquals(2, records.size());
-
- // TODO: currently the offset does not advance beyond the control record until a record
- // with a larger offset is fetched. In the worst case, we may fetch the control record
- // again after a rebalance, but that should be fine since we just discard it anyway
- assertEquals(3L, subscriptions.position(tp).longValue());
+ assertEquals(4L, subscriptions.position(tp).longValue());
for (ConsumerRecord<byte[], byte[]> record : records)
assertArrayEquals("key".getBytes(), record.key());
}