You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/16 18:56:46 UTC
[1/2] kafka git commit: MINOR: Replace deepIterator/shallowIterator
with deepEntries/shallowEntries
Repository: kafka
Updated Branches:
refs/heads/trunk e55205b81 -> b58b6a1be
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 4467394..e6260a9 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -165,7 +165,7 @@ class FetchRequestTest extends BaseRequestTest {
}
private def logEntries(partitionData: FetchResponse.PartitionData): Seq[LogEntry] = {
- partitionData.records.deepIterator.asScala.toIndexedSeq
+ partitionData.records.deepEntries.asScala.toIndexedSeq
}
private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse,
@@ -183,7 +183,7 @@ class FetchRequestTest extends BaseRequestTest {
val records = partitionData.records
responseBufferSize += records.sizeInBytes
- val entries = records.shallowIterator.asScala.toIndexedSeq
+ val entries = records.shallowEntries.asScala.toIndexedSeq
assertTrue(entries.size < numMessagesPerPartition)
val entriesSize = entries.map(_.sizeInBytes).sum
responseSize += entriesSize
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a643f63..50c55b8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -64,10 +64,10 @@ class ReplicaManagerQuotasTest {
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
- fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
assertEquals("But we shouldn't get the second", 0,
- fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
}
@Test
@@ -89,9 +89,9 @@ class ReplicaManagerQuotasTest {
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
- fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
- fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
}
@Test
@@ -113,9 +113,9 @@ class ReplicaManagerQuotasTest {
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
- fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
- fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
}
@Test
@@ -137,10 +137,10 @@ class ReplicaManagerQuotasTest {
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
- fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
assertEquals("But we should get the second too since it's throttled but in sync", 1,
- fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+ fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
}
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: Record = this.record, bothReplicasInSync: Boolean = false) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 421de32..50a4cd6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -247,7 +247,7 @@ class ReplicaManagerTest {
assertTrue(fetchCallbackFired)
assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
- assertTrue("Should return some data", fetchedRecords.shallowIterator.hasNext)
+ assertTrue("Should return some data", fetchedRecords.shallowEntries.iterator.hasNext)
fetchCallbackFired = false
// Fetch a message above the high watermark as a consumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 2f73a94..ff72657 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -154,7 +154,7 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator.next().record)
+ quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowEntries.iterator.next().record)
assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
replicaManager.readFromLocalLog(
replicaId = Request.OrdinaryConsumerId,
@@ -163,7 +163,7 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator().next().record)
+ quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowEntries().iterator.next().record)
assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())
[2/2] kafka git commit: MINOR: Replace deepIterator/shallowIterator
with deepEntries/shallowEntries
Posted by jg...@apache.org.
MINOR: Replace deepIterator/shallowIterator with deepEntries/shallowEntries
The latter return `Iterable` instead of `Iterator` so that enhanced foreach can be used
in Java.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2261 from ijuma/deepEntries-shallowEntries
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b58b6a1b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b58b6a1b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b58b6a1b
Branch: refs/heads/trunk
Commit: b58b6a1bef0ecdc2107a415e222af099fcd9bce3
Parents: e55205b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Dec 16 10:41:27 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Dec 16 10:41:27 2016 -0800
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 5 +-
.../kafka/common/record/AbstractRecords.java | 47 ++++++++++--------
.../apache/kafka/common/record/FileRecords.java | 52 +++++++++++++-------
.../kafka/common/record/MemoryRecords.java | 45 +++++++++++------
.../org/apache/kafka/common/record/Records.java | 9 ++--
.../internals/RecordAccumulatorTest.java | 9 ++--
.../record/ByteBufferLogInputStreamTest.java | 10 ++--
.../kafka/common/record/FileRecordsTest.java | 22 ++-------
.../common/record/MemoryRecordsBuilderTest.java | 21 ++------
.../kafka/common/record/MemoryRecordsTest.java | 8 +--
.../java/org/apache/kafka/test/TestUtils.java | 9 +++-
.../coordinator/GroupMetadataManager.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 6 +--
core/src/main/scala/kafka/log/LogCleaner.scala | 4 +-
core/src/main/scala/kafka/log/LogSegment.scala | 9 ++--
.../src/main/scala/kafka/log/LogValidator.scala | 10 ++--
.../kafka/message/ByteBufferMessageSet.scala | 6 +--
.../main/scala/kafka/message/MessageSet.scala | 2 +-
.../kafka/server/AbstractFetcherThread.scala | 2 +-
.../scala/kafka/tools/DumpLogSegments.scala | 7 ++-
.../kafka/tools/ReplicaVerificationTool.scala | 12 ++---
.../api/GroupCoordinatorIntegrationTest.scala | 4 +-
.../scala/kafka/tools/TestLogCleaning.scala | 2 +-
.../test/scala/other/kafka/StressTestLog.scala | 2 +-
.../message/BaseMessageSetTestCases.scala | 8 +--
.../unit/kafka/log/BrokerCompressionTest.scala | 2 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 2 +-
.../log/LogCleanerLagIntegrationTest.scala | 2 +-
.../scala/unit/kafka/log/LogCleanerTest.scala | 16 +++---
.../scala/unit/kafka/log/LogSegmentTest.scala | 24 ++++-----
.../src/test/scala/unit/kafka/log/LogTest.scala | 35 ++++++-------
.../scala/unit/kafka/log/LogValidatorTest.scala | 38 +++++++-------
.../kafka/message/BaseMessageSetTestCases.scala | 5 +-
.../server/AbstractFetcherThreadTest.scala | 2 +-
.../unit/kafka/server/FetchRequestTest.scala | 4 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 16 +++---
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../unit/kafka/server/SimpleFetchTest.scala | 4 +-
38 files changed, 241 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3b9d49c..526b0a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,7 +61,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@@ -687,9 +686,7 @@ public class Fetcher<K, V> {
}
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
- Iterator<LogEntry> deepIterator = partition.records.deepIterator();
- while (deepIterator.hasNext()) {
- LogEntry logEntry = deepIterator.next();
+ for (LogEntry logEntry : partition.records.deepEntries()) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 3794dc6..3a96d88 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -16,19 +16,37 @@
**/
package org.apache.kafka.common.record;
-import org.apache.kafka.common.utils.AbstractIterator;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public abstract class AbstractRecords implements Records {
+ private final Iterable<Record> records = new Iterable<Record>() {
+ @Override
+ public Iterator<Record> iterator() {
+ return new Iterator<Record>() {
+ private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator();
+ @Override
+ public boolean hasNext() {
+ return deepEntries.hasNext();
+ }
+ @Override
+ public Record next() {
+ return deepEntries.next().record();
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Removal not supported");
+ }
+ };
+ }
+ };
+
@Override
public boolean hasMatchingShallowMagic(byte magic) {
- Iterator<? extends LogEntry> iterator = shallowIterator();
- while (iterator.hasNext())
- if (iterator.next().magic() != magic)
+ for (LogEntry entry : shallowEntries())
+ if (entry.magic() != magic)
return false;
return true;
}
@@ -39,11 +57,8 @@ public abstract class AbstractRecords implements Records {
@Override
public Records toMessageFormat(byte toMagic) {
List<LogEntry> converted = new ArrayList<>();
- Iterator<LogEntry> deepIterator = deepIterator();
- while (deepIterator.hasNext()) {
- LogEntry entry = deepIterator.next();
+ for (LogEntry entry : deepEntries())
converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
- }
if (converted.isEmpty()) {
// This indicates that the message is too large, which indicates that the buffer is not large
@@ -60,7 +75,7 @@ public abstract class AbstractRecords implements Records {
// cause some timestamp information to be lost (e.g. if the timestamp type was changed) since
// we are essentially merging multiple message sets. However, currently this method is only
// used for down-conversion, so we've ignored the problem.
- CompressionType compressionType = shallowIterator().next().record().compressionType();
+ CompressionType compressionType = shallowEntries().iterator().next().record().compressionType();
return MemoryRecords.withLogEntries(compressionType, converted);
}
}
@@ -77,16 +92,8 @@ public abstract class AbstractRecords implements Records {
* Get an iterator over the deep records.
* @return An iterator over the records
*/
- public Iterator<Record> records() {
- return new AbstractIterator<Record>() {
- private final Iterator<? extends LogEntry> deepEntries = deepIterator();
- @Override
- protected Record makeNext() {
- if (deepEntries.hasNext())
- return deepEntries.next().record();
- return allDone();
- }
- };
+ public Iterable<Record> records() {
+ return records;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index faf61e9..52f3103 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -44,6 +44,15 @@ public class FileRecords extends AbstractRecords implements Closeable {
private volatile File file;
private final AtomicInteger size;
+ private final Iterable<FileChannelLogEntry> shallowEntries;
+
+ private final Iterable<LogEntry> deepEntries = new Iterable<LogEntry>() {
+ @Override
+ public Iterator<LogEntry> iterator() {
+ return deepIterator();
+ }
+ };
+
public FileRecords(File file,
FileChannel channel,
int start,
@@ -58,6 +67,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
// set the initial size of the buffer
resize();
+
+ shallowEntries = shallowEntriesFrom(start);
}
public void resize() throws IOException {
@@ -246,9 +257,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @param startingPosition The starting position in the file to begin searching from.
*/
public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
- Iterator<FileChannelLogEntry> iterator = shallowIteratorFrom(Integer.MAX_VALUE, startingPosition);
- while (iterator.hasNext()) {
- FileChannelLogEntry entry = iterator.next();
+ for (FileChannelLogEntry entry : shallowEntriesFrom(startingPosition)) {
long offset = entry.offset();
if (offset >= targetOffset)
return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
@@ -264,9 +273,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @return The timestamp and offset of the message found. None, if no message is found.
*/
public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
- Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
- while (shallowIterator.hasNext()) {
- LogEntry shallowEntry = shallowIterator.next();
+ for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
Record shallowRecord = shallowEntry.record();
if (shallowRecord.timestamp() >= targetTimestamp) {
// We found a message
@@ -292,9 +299,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
long maxTimestamp = Record.NO_TIMESTAMP;
long offsetOfMaxTimestamp = -1L;
- Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
- while (shallowIterator.hasNext()) {
- LogEntry shallowEntry = shallowIterator.next();
+ for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
long timestamp = shallowEntry.record().timestamp();
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
@@ -311,8 +316,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @return An iterator over the shallow entries
*/
@Override
- public Iterator<FileChannelLogEntry> shallowIterator() {
- return shallowIteratorFrom(start);
+ public Iterable<FileChannelLogEntry> shallowEntries() {
+ return shallowEntries;
}
/**
@@ -320,15 +325,24 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
* @return An iterator over the shallow entries
*/
- public Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize) {
- return shallowIteratorFrom(maxRecordSize, start);
+ public Iterable<FileChannelLogEntry> shallowEntries(int maxRecordSize) {
+ return shallowEntries(maxRecordSize, start);
}
- private Iterator<FileChannelLogEntry> shallowIteratorFrom(int start) {
- return shallowIteratorFrom(Integer.MAX_VALUE, start);
+ private Iterable<FileChannelLogEntry> shallowEntriesFrom(int start) {
+ return shallowEntries(Integer.MAX_VALUE, start);
}
- private Iterator<FileChannelLogEntry> shallowIteratorFrom(int maxRecordSize, int start) {
+ private Iterable<FileChannelLogEntry> shallowEntries(final int maxRecordSize, final int start) {
+ return new Iterable<FileChannelLogEntry>() {
+ @Override
+ public Iterator<FileChannelLogEntry> iterator() {
+ return shallowIterator(maxRecordSize, start);
+ }
+ };
+ }
+
+ private Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize, int start) {
final int end;
if (isSlice)
end = this.end;
@@ -339,7 +353,11 @@ public class FileRecords extends AbstractRecords implements Closeable {
}
@Override
- public Iterator<LogEntry> deepIterator() {
+ public Iterable<LogEntry> deepEntries() {
+ return deepEntries;
+ }
+
+ private Iterator<LogEntry> deepIterator() {
final int end;
if (isSlice)
end = this.end;
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 0301762..1485486 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -31,8 +31,17 @@ public class MemoryRecords extends AbstractRecords {
public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
- // the underlying buffer used for read; while the records are still writable it is null
- private ByteBuffer buffer;
+ private final ByteBuffer buffer;
+
+ private final Iterable<ByteBufferLogEntry> shallowEntries = new Iterable<ByteBufferLogEntry>() {
+ @Override
+ public Iterator<ByteBufferLogEntry> iterator() {
+ return shallowIterator();
+ }
+ };
+
+ private final Iterable<LogEntry> deepEntries = deepEntries(false);
+
private int validBytes = -1;
// Construct a writable memory records
@@ -79,9 +88,8 @@ public class MemoryRecords extends AbstractRecords {
return validBytes;
int bytes = 0;
- Iterator<ByteBufferLogEntry> iterator = shallowIterator();
- while (iterator.hasNext())
- bytes += iterator.next().sizeInBytes();
+ for (LogEntry entry : shallowEntries())
+ bytes += entry.sizeInBytes();
this.validBytes = bytes;
return bytes;
@@ -102,9 +110,7 @@ public class MemoryRecords extends AbstractRecords {
int messagesRetained = 0;
int bytesRetained = 0;
- Iterator<ByteBufferLogEntry> shallowIterator = shallowIterator();
- while (shallowIterator.hasNext()) {
- ByteBufferLogEntry shallowEntry = shallowIterator.next();
+ for (ByteBufferLogEntry shallowEntry : shallowEntries()) {
bytesRead += shallowEntry.sizeInBytes();
// We use the absolute offset to decide whether to retain the message or not (this is handled by the
@@ -174,27 +180,36 @@ public class MemoryRecords extends AbstractRecords {
}
@Override
- public Iterator<ByteBufferLogEntry> shallowIterator() {
+ public Iterable<ByteBufferLogEntry> shallowEntries() {
+ return shallowEntries;
+ }
+
+ private Iterator<ByteBufferLogEntry> shallowIterator() {
return RecordsIterator.shallowIterator(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
}
@Override
- public Iterator<LogEntry> deepIterator() {
- return deepIterator(false);
+ public Iterable<LogEntry> deepEntries() {
+ return deepEntries;
}
- public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic) {
- return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
+ public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic) {
+ return new Iterable<LogEntry>() {
+ @Override
+ public Iterator<LogEntry> iterator() {
+ return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
+ }
+ };
}
- public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
+ private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
ensureMatchingMagic, maxMessageSize);
}
@Override
public String toString() {
- Iterator<LogEntry> iter = deepIterator();
+ Iterator<LogEntry> iter = deepEntries().iterator();
StringBuilder builder = new StringBuilder();
builder.append('[');
while (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 823d2b7..f0dbf9e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -18,14 +18,13 @@ package org.apache.kafka.common.record;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
/**
* Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
* Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
* If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
* compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
- * over the shallow records, use {@link #shallowIterator()}; for the deep records, use {@link #deepIterator()}. Note
+ * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries()}. Note
* that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
* shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
* See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
@@ -61,16 +60,16 @@ public interface Records {
* record data (see {@link FileLogInputStream.FileChannelLogEntry#magic()}.
* @return An iterator over the shallow entries of the log
*/
- Iterator<? extends LogEntry> shallowIterator();
+ Iterable<? extends LogEntry> shallowEntries();
/**
* Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
* there are fewer options for optimization since the data must be decompressed before it can be
* returned. Hence there is little advantage in allowing subclasses to return a more specific type
- * as we do for {@link #shallowIterator()}.
+ * as we do for {@link #shallowEntries()}.
* @return An iterator over the deep entries of the log
*/
- Iterator<LogEntry> deepIterator();
+ Iterable<LogEntry> deepEntries();
/**
* Check whether all shallow entries in this buffer have a certain magic value.
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 4f25bdf..04e1411 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -101,7 +101,7 @@ public class RecordAccumulatorTest {
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records().deepIterator();
+ Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
for (int i = 0; i < appends; i++) {
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -130,7 +130,7 @@ public class RecordAccumulatorTest {
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records().deepIterator();
+ Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
@@ -182,11 +182,8 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
- Iterator<LogEntry> deepEntries = batch.records().deepIterator();
- while (deepEntries.hasNext()) {
- deepEntries.next();
+ for (LogEntry entry : batch.records().deepEntries())
read++;
- }
accum.deallocate(batch);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
index 62e8a05..c8621cd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
@@ -37,7 +37,7 @@ public class ByteBufferLogInputStreamTest {
ByteBuffer recordsBuffer = builder.build().buffer();
recordsBuffer.limit(recordsBuffer.limit() - 5);
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowIterator();
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowEntries().iterator();
assertTrue(iterator.hasNext());
ByteBufferLogInputStream.ByteBufferLogEntry first = iterator.next();
assertEquals(0L, first.offset());
@@ -50,7 +50,7 @@ public class ByteBufferLogInputStreamTest {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
assertTrue(iterator.hasNext());
ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
@@ -67,7 +67,7 @@ public class ByteBufferLogInputStreamTest {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
assertTrue(iterator.hasNext());
ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
@@ -81,7 +81,7 @@ public class ByteBufferLogInputStreamTest {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
assertTrue(iterator.hasNext());
ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
@@ -98,7 +98,7 @@ public class ByteBufferLogInputStreamTest {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
assertTrue(iterator.hasNext());
ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 7e2c256..dcd3bef 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -26,10 +26,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import static org.apache.kafka.test.TestUtils.tempFile;
@@ -85,7 +83,7 @@ public class FileRecordsTest {
fileRecords.channel().write(buffer);
// appending those bytes should not change the contents
- TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+ TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
}
/**
@@ -94,7 +92,7 @@ public class FileRecordsTest {
@Test
public void testIterationDoesntChangePosition() throws IOException {
long position = fileRecords.channel().position();
- TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+ TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
assertEquals(position, fileRecords.channel().position());
}
@@ -104,7 +102,7 @@ public class FileRecordsTest {
@Test
public void testRead() throws IOException {
FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
- TestUtils.checkEquals(fileRecords.shallowIterator(), read.shallowIterator());
+ TestUtils.checkEquals(fileRecords.shallowEntries(), read.shallowEntries());
List<LogEntry> items = shallowEntries(read);
LogEntry second = items.get(1);
@@ -383,21 +381,11 @@ public class FileRecordsTest {
}
private static List<LogEntry> shallowEntries(Records buffer) {
- List<LogEntry> entries = new ArrayList<>();
- Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
- while (iterator.hasNext())
- entries.add(iterator.next());
- return entries;
+ return TestUtils.toList(buffer.shallowEntries().iterator());
}
private static List<LogEntry> deepEntries(Records buffer) {
- List<LogEntry> entries = new ArrayList<>();
- Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
- while (iterator.hasNext()) {
- for (LogEntry deepEntry : iterator.next())
- entries.add(deepEntry);
- }
- return entries;
+ return TestUtils.toList(buffer.deepEntries().iterator());
}
private FileRecords createFileRecords(Record ... records) throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 40fa212..a52976b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -121,9 +120,7 @@ public class MemoryRecordsBuilderTest {
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
- Iterator<Record> iterator = records.records();
- while (iterator.hasNext()) {
- Record record = iterator.next();
+ for (Record record : records.records()) {
assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
assertEquals(logAppendTime, record.timestamp());
}
@@ -148,9 +145,7 @@ public class MemoryRecordsBuilderTest {
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
- Iterator<Record> iterator = records.records();
- while (iterator.hasNext()) {
- Record record = iterator.next();
+ for (Record record : records.records()) {
assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
assertEquals(logAppendTime, record.timestamp());
}
@@ -177,11 +172,9 @@ public class MemoryRecordsBuilderTest {
else
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
- Iterator<Record> iterator = records.records();
int i = 0;
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
- while (iterator.hasNext()) {
- Record record = iterator.next();
+ for (Record record : records.records()) {
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
assertEquals(expectedTimestamps[i++], record.timestamp());
}
@@ -206,10 +199,8 @@ public class MemoryRecordsBuilderTest {
assertEquals(2L, info.maxTimestamp);
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
- Iterator<Record> iterator = records.records();
long i = 0L;
- while (iterator.hasNext()) {
- Record record = iterator.next();
+ for (Record record : records.records()) {
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
assertEquals(i++, record.timestamp());
}
@@ -233,9 +224,7 @@ public class MemoryRecordsBuilderTest {
assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
- Iterator<Record> iterator = records.records();
- while (iterator.hasNext()) {
- Record record = iterator.next();
+ for (Record record : records.records()) {
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
assertEquals(Record.NO_TIMESTAMP, record.timestamp());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index ef0fbeb..f2741ee 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -69,7 +69,7 @@ public class MemoryRecordsTest {
for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : asList(recs1, recs2)) {
- Iterator<LogEntry> iter = recs.deepIterator();
+ Iterator<LogEntry> iter = recs.deepEntries().iterator();
for (int i = 0; i < list.size(); i++) {
assertTrue(iter.hasNext());
LogEntry entry = iter.next();
@@ -135,7 +135,7 @@ public class MemoryRecordsTest {
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
- List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+ List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
assertEquals(expectedOffsets.size(), shallowEntries.size());
@@ -148,7 +148,7 @@ public class MemoryRecordsTest {
shallowEntry.record().timestampType());
}
- List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepIterator());
+ List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries().iterator());
assertEquals(4, deepEntries.size());
LogEntry first = deepEntries.get(0);
@@ -197,7 +197,7 @@ public class MemoryRecordsTest {
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
- List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+ List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
for (LogEntry shallowEntry : shallowEntries) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 4e80b61..428b5a0 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -325,7 +325,14 @@ public class TestUtils {
assertFalse("Iterators have uneven length--second has more", s2.hasNext());
}
- public static <T> List<T> toList(Iterator<T> iterator) {
+ /**
+ * Checks the two iterables for equality by first converting both to a list.
+ */
+ public static <T> void checkEquals(Iterable<T> it1, Iterable<T> it2) {
+ assertEquals(toList(it1.iterator()), toList(it2.iterator()));
+ }
+
+ public static <T> List<T> toList(Iterator<? extends T> iterator) {
List<T> res = new ArrayList<>();
while (iterator.hasNext())
res.add(iterator.next());
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index a97b527..0eb52bb 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -398,7 +398,7 @@ class GroupMetadataManager(val brokerId: Int,
val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage = true).records.asInstanceOf[FileRecords]
fileRecords.readInto(buffer, 0)
- MemoryRecords.readableRecords(buffer).deepIterator.asScala.foreach { entry =>
+ MemoryRecords.readableRecords(buffer).deepEntries.asScala.foreach { entry =>
val record = entry.record
require(record.hasKey, "Offset entry key should not be null")
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7a54b77..6cd7953 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -386,7 +386,7 @@ class Log(@volatile var dir: File,
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
- for (logEntry <- validRecords.shallowIterator.asScala) {
+ for (logEntry <- validRecords.shallowEntries.asScala) {
if (logEntry.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
@@ -401,7 +401,7 @@ class Log(@volatile var dir: File,
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
- throw new IllegalArgumentException("Out of order offsets found in " + records.deepIterator.asScala.map(_.offset))
+ throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
}
// check messages set size may be exceed config.segmentSize
@@ -465,7 +465,7 @@ class Log(@volatile var dir: File,
var monotonic = true
var maxTimestamp = Record.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
- for (entry <- records.shallowIterator.asScala) {
+ for (entry <- records.shallowEntries.asScala) {
// update the first offset if on the first message
if(firstOffset < 0)
firstOffset = entry.offset
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7676c88..7abd1d8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -477,7 +477,7 @@ private[log] class Cleaner(val id: Int,
if (writeBuffer.position > 0) {
writeBuffer.flip()
val retained = MemoryRecords.readableRecords(writeBuffer)
- dest.append(firstOffset = retained.deepIterator().next().offset,
+ dest.append(firstOffset = retained.deepEntries.iterator.next().offset,
largestOffset = result.maxOffset,
largestTimestamp = result.maxTimestamp,
shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
@@ -632,7 +632,7 @@ private[log] class Cleaner(val id: Int,
throttler.maybeThrottle(records.sizeInBytes)
val startPosition = position
- for (entry <- records.deepIterator.asScala) {
+ for (entry <- records.deepEntries.asScala) {
val message = entry.record
if (message.hasKey && entry.offset >= start) {
if (map.size < maxDesiredMapSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index c08a0bc..eddb47a 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -219,10 +219,9 @@ class LogSegment(val log: FileRecords,
timeIndex.resize(timeIndex.maxIndexSize)
var validBytes = 0
var lastIndexEntry = 0
- val iter = log.shallowIterator(maxMessageSize)
maxTimestampSoFar = Record.NO_TIMESTAMP
try {
- for (entry <- iter.asScala) {
+ for (entry <- log.shallowEntries(maxMessageSize).asScala) {
val record = entry.record
record.ensureValid()
@@ -313,7 +312,7 @@ class LogSegment(val log: FileRecords,
if (ms == null) {
baseOffset
} else {
- ms.records.shallowIterator.asScala.toSeq.lastOption match {
+ ms.records.shallowEntries.asScala.toSeq.lastOption match {
case None => baseOffset
case Some(last) => last.nextOffset
}
@@ -374,9 +373,9 @@ class LogSegment(val log: FileRecords,
def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
// Load the timestamp of the first message into memory
if (rollingBasedTimestamp.isEmpty) {
- val iter = log.shallowIterator
+ val iter = log.shallowEntries.iterator()
if (iter.hasNext)
- rollingBasedTimestamp = Some(iter.next.record.timestamp)
+ rollingBasedTimestamp = Some(iter.next().record.timestamp)
}
rollingBasedTimestamp match {
case Some(t) if t >= 0 => messageTimestamp - t
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index d9f27e4..1713942 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -78,7 +78,7 @@ private[kafka] object LogValidator {
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
- records.deepIterator(true).asScala.foreach { logEntry =>
+ records.deepEntries(true).asScala.foreach { logEntry =>
val record = logEntry.record
validateKey(record, compactedTopic)
@@ -121,7 +121,7 @@ private[kafka] object LogValidator {
validatedRecords.foreach(_.ensureValid)
// we can update the wrapper message only and write the compressed payload as is
- val entry = records.shallowIterator.next()
+ val entry = records.shallowEntries.iterator.next()
val offset = offsetCounter.addAndGet(validatedRecords.size) - 1
entry.setOffset(offset)
if (messageTimestampType == TimestampType.CREATE_TIME)
@@ -144,7 +144,7 @@ private[kafka] object LogValidator {
timestampType: TimestampType,
messageTimestampDiffMaxMs: Long,
toMagicValue: Byte): ValidationAndOffsetAssignResult = {
- val sizeInBytesAfterConversion = records.shallowIterator.asScala.map { logEntry =>
+ val sizeInBytesAfterConversion = records.shallowEntries.asScala.map { logEntry =>
logEntry.record.convertedSize(toMagicValue)
}.sum
@@ -152,7 +152,7 @@ private[kafka] object LogValidator {
val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
offsetCounter.value, now)
- records.shallowIterator.asScala.foreach { logEntry =>
+ records.shallowEntries.asScala.foreach { logEntry =>
val record = logEntry.record
validateKey(record, compactedTopic)
validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs)
@@ -179,7 +179,7 @@ private[kafka] object LogValidator {
var offsetOfMaxTimestamp = -1L
val firstOffset = offsetCounter.value
- for (entry <- records.shallowIterator.asScala) {
+ for (entry <- records.shallowEntries.asScala) {
val record = entry.record
validateKey(record, compactedTopic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 2c8fef6..198a4c3 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -167,10 +167,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
/** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
val entries = if (isShallow)
- asRecords.shallowIterator
+ asRecords.shallowEntries
else
- asRecords.deepIterator
- entries.asScala.map(MessageAndOffset.fromLogEntry)
+ asRecords.deepEntries
+ entries.iterator.asScala.map(MessageAndOffset.fromLogEntry)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index a44a362..2fe54cd 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -93,7 +93,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
override def toString: String = {
val builder = new StringBuilder()
builder.append(getClass.getSimpleName + "(")
- val iter = this.asRecords.shallowIterator()
+ val iter = this.asRecords.shallowEntries.iterator
var i = 0
while(iter.hasNext && i < 100) {
val message = iter.next
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f959ce2..1dbd373 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -145,7 +145,7 @@ abstract class AbstractFetcherThread(name: String,
case Errors.NONE =>
try {
val records = partitionData.toRecords
- val newOffset = records.shallowIterator.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse(
+ val newOffset = records.shallowEntries.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse(
currentPartitionFetchState.offset)
fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index ceff78c..8315e64 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -146,7 +146,7 @@ object DumpLogSegments {
for(i <- 0 until index.entries) {
val entry = index.entry(i)
val slice = fileRecords.read(entry.position, maxMessageSize)
- val logEntry = getIterator(slice.shallowIterator.next, isDeepIteration = true).next()
+ val logEntry = getIterator(slice.shallowEntries.iterator.next(), isDeepIteration = true).next()
if (logEntry.offset != entry.offset + index.baseOffset) {
var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
misMatchesSeq ::=(entry.offset + index.baseOffset, logEntry.offset)
@@ -184,7 +184,7 @@ object DumpLogSegments {
val entry = timeIndex.entry(i)
val position = index.lookup(entry.offset + timeIndex.baseOffset).position
val partialFileRecords = fileRecords.read(position, Int.MaxValue)
- val shallowEntries = partialFileRecords.shallowIterator.asScala
+ val shallowEntries = partialFileRecords.shallowEntries.asScala
var maxTimestamp = Record.NO_TIMESTAMP
// We first find the message by offset then check if the timestamp is correct.
val maybeLogEntry = shallowEntries.find(_.offset >= entry.offset + timeIndex.baseOffset)
@@ -311,8 +311,7 @@ object DumpLogSegments {
val messageSet = FileRecords.open(file, false)
var validBytes = 0L
var lastOffset = -1l
- val shallowIterator = messageSet.shallowIterator(maxMessageSize).asScala
- for (shallowLogEntry <- shallowIterator) { // this only does shallow iteration
+ for (shallowLogEntry <- messageSet.shallowEntries(maxMessageSize).asScala) {
val itr = getIterator(shallowLogEntry, isDeepIteration)
for (deepLogEntry <- itr) {
val record = deepLogEntry.record()
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index c483021..98e0414 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -275,9 +275,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
"fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
+ expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
- val logEntryIteratorMap = fetchResponsePerReplica.map {
- case(replicaId, fetchResponse) =>
- replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowIterator.asScala
+ val logEntriesMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
+ replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.asScala
}
val maxHw = fetchResponsePerReplica.values.map(_.hw).max
@@ -285,10 +284,11 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
var isMessageInAllReplicas = true
while (isMessageInAllReplicas) {
var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
- for ( (replicaId, logEntries) <- logEntryIteratorMap) {
+ for ((replicaId, logEntries) <- logEntriesMap) {
try {
- if (logEntries.hasNext) {
- val logEntry = logEntries.next()
+ val logEntriesIterator = logEntries.iterator
+ if (logEntriesIterator.hasNext) {
+ val logEntry = logEntriesIterator.next()
// only verify up to the high watermark
if (logEntry.offset >= fetchResponsePerReplica.get(replicaId).hw)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 51e987a..ac310a9 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -50,12 +50,12 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
def getGroupMetadataLogOpt: Option[Log] =
logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0))
- TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowIterator.asScala.nonEmpty)),
+ TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowEntries.asScala.nonEmpty)),
"Commit message not appended in time")
val logSegments = getGroupMetadataLogOpt.get.logSegments
val incorrectCompressionCodecs = logSegments
- .flatMap(_.log.shallowIterator.asScala.map(_.record.compressionType.id))
+ .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType.id))
.filter(_ != offsetsTopicCompressionCodec.codec)
assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index ecf7408..d837885 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -140,7 +140,7 @@ object TestLogCleaning {
require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
val fileRecords = FileRecords.open(new File(dir, file))
- for (entry <- fileRecords.shallowIterator.asScala) {
+ for (entry <- fileRecords.shallowEntries.asScala) {
val key = TestUtils.readString(entry.record.key)
val content =
if(entry.record.hasNullValue)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 3381fb7..a67c166 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -98,7 +98,7 @@ object StressTestLog {
try {
log.read(offset, 1024, Some(offset+1)).records match {
case read: FileRecords if read.sizeInBytes > 0 => {
- val first = read.shallowIterator.next()
+ val first = read.shallowEntries.iterator.next()
require(first.offset == offset, "We should either read nothing or the message we asked for.")
require(first.sizeInBytes == read.sizeInBytes, "Expected %d but got %d.".format(first.sizeInBytes, read.sizeInBytes))
offset += 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index 4a1be11..3327a65 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -20,20 +20,20 @@ package kafka.javaapi.message
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
-import kafka.utils.TestUtils
-import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message}
+import kafka.message.{CompressionCodec, DefaultCompressionCodec, Message, NoCompressionCodec}
+import org.apache.kafka.test.TestUtils
+
import scala.collection.JavaConverters._
trait BaseMessageSetTestCases extends JUnitSuite {
val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
- def toMessageIterator(messageSet: MessageSet): Iterator[Message] = messageSet.asScala.map(m => m.message).iterator
@Test
def testWrittenEqualsRead {
val messageSet = createMessageSet(messages)
- TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
+ assertEquals(messages.toSeq, messageSet.asScala.map(m => m.message))
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 296dc15..6a165ed 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -59,7 +59,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
Record.create("hello".getBytes), Record.create("there".getBytes)))
- def readMessage(offset: Int) = log.read(offset, 4096).records.shallowIterator.next().record
+ def readMessage(offset: Int) = log.read(offset, 4096).records.shallowEntries.iterator.next().record
if (!brokerCompression.equals("producer")) {
val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 65c2d05..43c41f3 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -250,7 +250,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
import JavaConverters._
- for (segment <- log.logSegments; deepLogEntry <- segment.log.deepIterator.asScala) yield {
+ for (segment <- log.logSegments; deepLogEntry <- segment.log.deepEntries.asScala) yield {
val key = TestUtils.readString(deepLogEntry.record.key).toInt
val value = TestUtils.readString(deepLogEntry.record.value)
(key, value, deepLogEntry.offset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index abab3bf..1231e98 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -106,7 +106,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
private def readFromLog(log: Log): Iterable[(Int, Int)] = {
import JavaConverters._
- for (segment <- log.logSegments; logEntry <- segment.log.deepIterator.asScala) yield {
+ for (segment <- log.logSegments; logEntry <- segment.log.deepEntries.asScala) yield {
val key = TestUtils.readString(logEntry.record.key).toInt
val value = TestUtils.readString(logEntry.record.value).toInt
key -> value
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ae8e401..f43e92b 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -229,7 +229,7 @@ class LogCleanerTest extends JUnitSuite {
// the last (active) segment has just one message
- def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowIterator.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
+ def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowEntries.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
@@ -324,14 +324,14 @@ class LogCleanerTest extends JUnitSuite {
/* extract all the keys from a log */
def keysInLog(log: Log): Iterable[Int] =
- log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
+ log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
/* extract all the offsets from a log */
def offsetsInLog(log: Log): Iterable[Long] =
- log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
+ log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
def unkeyedMessageCountInLog(log: Log) =
- log.logSegments.map(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
+ log.logSegments.map(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
throw new LogCleaningAbortedException()
@@ -679,7 +679,7 @@ class LogCleanerTest extends JUnitSuite {
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
- for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowIterator.asScala; deepLogEntry <- shallowLogEntry.asScala) {
+ for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowEntries.asScala; deepLogEntry <- shallowLogEntry.asScala) {
assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic)
val value = TestUtils.readString(deepLogEntry.record.value).toLong
assertEquals(deepLogEntry.offset, value)
@@ -701,7 +701,7 @@ class LogCleanerTest extends JUnitSuite {
val corruptedMessage = invalidCleanedMessage(offset, set)
val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
- for (logEntry <- records.deepIterator.asScala) {
+ for (logEntry <- records.deepEntries.asScala) {
val offset = logEntry.offset
val value = TestUtils.readString(logEntry.record.value).toLong
assertEquals(offset, value)
@@ -727,14 +727,14 @@ class LogCleanerTest extends JUnitSuite {
timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
log.roll()
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, log.activeSegment.baseOffset))
- assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
// Append a message and roll out another log segment.
log.append(TestUtils.singletonRecords(value = "1".getBytes,
key = "1".getBytes,
timestamp = time.milliseconds()))
log.roll()
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
- assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
}
private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 1c747ec..1197b02 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -77,7 +77,7 @@ class LogSegmentTest {
val ms = records(50, "hello", "there", "little", "bee")
seg.append(50, 53, Record.NO_TIMESTAMP, -1L, ms)
val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
- assertEquals(ms.deepIterator.asScala.toList, read.deepIterator.asScala.toList)
+ assertEquals(ms.deepEntries.asScala.toList, read.deepEntries.asScala.toList)
}
/**
@@ -91,8 +91,8 @@ class LogSegmentTest {
val ms = records(baseOffset, "hello", "there", "beautiful")
seg.append(baseOffset, 52, Record.NO_TIMESTAMP, -1L, ms)
def validate(offset: Long) =
- assertEquals(ms.deepIterator.asScala.filter(_.offset == offset).toList,
- seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepIterator.asScala.toList)
+ assertEquals(ms.deepEntries.asScala.filter(_.offset == offset).toList,
+ seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepEntries.asScala.toList)
validate(50)
validate(51)
validate(52)
@@ -122,7 +122,7 @@ class LogSegmentTest {
val ms2 = records(60, "alpha", "beta")
seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
+ assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
}
/**
@@ -140,12 +140,12 @@ class LogSegmentTest {
seg.append(offset + 1, offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
// check that we can read back both messages
val read = seg.read(offset, None, 10000)
- assertEquals(List(ms1.deepIterator.next(), ms2.deepIterator.next()), read.records.deepIterator.asScala.toList)
+ assertEquals(List(ms1.deepEntries.iterator.next(), ms2.deepEntries.iterator.next()), read.records.deepEntries.asScala.toList)
// now truncate off the last message
seg.truncateTo(offset + 1)
val read2 = seg.read(offset, None, 10000)
- assertEquals(1, read2.records.deepIterator.asScala.size)
- assertEquals(ms1.deepIterator.next(), read2.records.deepIterator.next())
+ assertEquals(1, read2.records.deepEntries.asScala.size)
+ assertEquals(ms1.deepEntries.iterator.next(), read2.records.deepEntries.iterator.next())
offset += 1
}
}
@@ -246,7 +246,7 @@ class LogSegmentTest {
TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(64*1024)
for(i <- 0 until 100)
- assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepIterator.next().offset)
+ assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepEntries.iterator.next().offset)
}
/**
@@ -285,7 +285,7 @@ class LogSegmentTest {
val position = recordPosition.position + TestUtils.random.nextInt(15)
TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
seg.recover(64*1024)
- assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowIterator.asScala.map(_.offset).toList)
+ assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowEntries.asScala.map(_.offset).toList)
seg.delete()
}
}
@@ -307,7 +307,7 @@ class LogSegmentTest {
val ms2 = records(60, "alpha", "beta")
seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
+ assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
}
/* create a segment with pre allocate and clearly shut down*/
@@ -321,7 +321,7 @@ class LogSegmentTest {
val ms2 = records(60, "alpha", "beta")
seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
+ assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
val oldSize = seg.log.sizeInBytes()
val oldPosition = seg.log.channel.position
val oldFileSize = seg.log.file.length
@@ -334,7 +334,7 @@ class LogSegmentTest {
segments += segReopen
val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepIterator.asScala.toList, readAgain.records.deepIterator.asScala.toList)
+ assertEquals(ms2.deepEntries.asScala.toList, readAgain.records.deepEntries.asScala.toList)
val size = segReopen.log.sizeInBytes()
val position = segReopen.log.channel.position
val fileSize = segReopen.log.file.length
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 49381a4..ff596bd 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -196,11 +196,11 @@ class LogTest extends JUnitSuite {
log.append(MemoryRecords.withRecords(records(i)))
for(i <- records.indices) {
- val read = log.read(i, 100, Some(i+1)).records.shallowIterator.next()
+ val read = log.read(i, 100, Some(i+1)).records.shallowEntries.iterator.next()
assertEquals("Offset read should match order appended.", i, read.offset)
assertEquals("Message should match appended.", records(i), read.record)
}
- assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowIterator.asScala.size)
+ assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowEntries.asScala.size)
}
/**
@@ -220,7 +220,7 @@ class LogTest extends JUnitSuite {
log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
for(i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
- val read = log.read(i, 100, None).records.shallowIterator.next()
+ val read = log.read(i, 100, None).records.shallowEntries.iterator.next()
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
assertEquals("Message should match appended.", records(idx), read.record)
}
@@ -245,7 +245,8 @@ class LogTest extends JUnitSuite {
// now manually truncate off all but one message from the first segment to create a gap in the messages
log.logSegments.head.truncateTo(1)
- assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).records.shallowIterator.next().offset)
+ assertEquals("A read should now return the last message in the log", log.logEndOffset - 1,
+ log.read(1, 200, None).records.shallowEntries.iterator.next().offset)
}
@Test
@@ -266,13 +267,13 @@ class LogTest extends JUnitSuite {
log.read(i, 1, minOneMessage = true),
log.read(i, 100, minOneMessage = true),
log.read(i, 100, Some(10000), minOneMessage = true)
- ).map(_.records.shallowIterator.next())
+ ).map(_.records.shallowEntries.iterator.next())
reads.foreach { read =>
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
assertEquals("Message should match appended.", records(idx), read.record)
}
- assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowIterator.asScala.toIndexedSeq)
+ assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowEntries.asScala.toIndexedSeq)
}
}
@@ -357,15 +358,15 @@ class LogTest extends JUnitSuite {
/* do successive reads to ensure all our messages are there */
var offset = 0L
for(i <- 0 until numMessages) {
- val messages = log.read(offset, 1024*1024).records.shallowIterator
- val head = messages.next()
+ val messages = log.read(offset, 1024*1024).records.shallowEntries
+ val head = messages.iterator.next()
assertEquals("Offsets not equal", offset, head.offset)
- assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowIterator.next().record,
- head.record.convert(messageSets(i).shallowIterator.next().record.magic))
+ assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowEntries.iterator.next().record,
+ head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic))
offset = head.offset + 1
}
val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records
- assertEquals("Should be no more messages", 0, lastRead.shallowIterator.asScala.size)
+ assertEquals("Should be no more messages", 0, lastRead.shallowEntries.asScala.size)
// check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
TestUtils.retry(1000L){
@@ -387,7 +388,7 @@ class LogTest extends JUnitSuite {
log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("hello".getBytes), Record.create("there".getBytes)))
log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("alpha".getBytes), Record.create("beta".getBytes)))
- def read(offset: Int) = log.read(offset, 4096).records.deepIterator
+ def read(offset: Int) = log.read(offset, 4096).records.deepEntries.iterator
/* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
@@ -624,7 +625,7 @@ class LogTest extends JUnitSuite {
assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
for(i <- 0 until numMessages) {
- assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset)
+ assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
if (i == 0)
assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
else
@@ -700,7 +701,7 @@ class LogTest extends JUnitSuite {
log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages) {
- assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset)
+ assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
if (i == 0)
assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
else
@@ -959,7 +960,7 @@ class LogTest extends JUnitSuite {
time.scheduler,
time)
log.append(MemoryRecords.withRecords(Record.create(null)))
- val head = log.read(0, 4096, None).records.shallowIterator().next()
+ val head = log.read(0, 4096, None).records.shallowEntries().iterator.next()
assertEquals(0, head.offset)
assertTrue("Message payload should be null.", head.record.hasNullValue)
}
@@ -998,7 +999,7 @@ class LogTest extends JUnitSuite {
val numMessages = 50 + TestUtils.random.nextInt(50)
for (_ <- 0 until numMessages)
log.append(set)
- val messages = log.logSegments.flatMap(_.log.deepIterator.asScala.toList)
+ val messages = log.logSegments.flatMap(_.log.deepEntries.asScala.toList)
log.close()
// corrupt index and log by appending random bytes
@@ -1009,7 +1010,7 @@ class LogTest extends JUnitSuite {
log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
assertEquals(numMessages, log.logEndOffset)
assertEquals("Messages in the log after recovery should be the same.", messages,
- log.logSegments.flatMap(_.log.deepIterator.asScala.toList))
+ log.logSegments.flatMap(_.log.deepEntries.asScala.toList))
Utils.delete(logDir)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 72c5b16..b201feb 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -44,8 +44,8 @@ class LogValidatorTest extends JUnitSuite {
messageTimestampType = TimestampType.LOG_APPEND_TIME,
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
- assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size)
- validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+ assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
+ validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
@@ -67,12 +67,12 @@ class LogValidatorTest extends JUnitSuite {
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
- assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size)
- validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
- assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid)
+ assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
+ validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+ assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
- assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}",
- records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
+ records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
}
@@ -93,13 +93,13 @@ class LogValidatorTest extends JUnitSuite {
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
- assertEquals("number of messages should not change", records.deepIterator.asScala.size,
- validatedRecords.deepIterator.asScala.size)
- validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
- assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid)
+ assertEquals("number of messages should not change", records.deepEntries.asScala.size,
+ validatedRecords.deepEntries.asScala.size)
+ validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+ assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
- assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}",
- records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
+ records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
}
@@ -124,7 +124,7 @@ class LogValidatorTest extends JUnitSuite {
val validatedRecords = validatingResults.validatedRecords
var i = 0
- for (logEntry <- validatedRecords.deepIterator.asScala) {
+ for (logEntry <- validatedRecords.deepEntries.asScala) {
logEntry.record.ensureValid()
assertEquals(logEntry.record.timestamp, timestampSeq(i))
assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
@@ -157,15 +157,15 @@ class LogValidatorTest extends JUnitSuite {
val validatedRecords = validatedResults.validatedRecords
var i = 0
- for (logEntry <- validatedRecords.deepIterator.asScala) {
+ for (logEntry <- validatedRecords.deepEntries.asScala) {
logEntry.record.ensureValid()
assertEquals(logEntry.record.timestamp, timestampSeq(i))
assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
i += 1
}
assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp)
- assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepIterator.asScala.size - 1}",
- validatedRecords.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size - 1}",
+ validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
}
@@ -358,9 +358,9 @@ class LogValidatorTest extends JUnitSuite {
/* check that offsets are assigned consecutively from the given base offset */
private def checkOffsets(records: MemoryRecords, baseOffset: Long) {
- assertTrue("Message set should not be empty", records.deepIterator.asScala.nonEmpty)
+ assertTrue("Message set should not be empty", records.deepEntries.asScala.nonEmpty)
var offset = baseOffset
- for (entry <- records.deepIterator.asScala) {
+ for (entry <- records.deepEntries.asScala) {
assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
offset += 1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index bd3ed68..ef2b0af 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -27,6 +27,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
trait BaseMessageSetTestCases extends JUnitSuite {
@@ -61,7 +62,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testWrittenEqualsRead() {
val messageSet = createMessageSet(messages)
- checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+ assertEquals(messages.toVector, messageSet.toVector.map(m => m.message))
}
@Test
@@ -123,7 +124,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
fileRecords.resize() // resize since we wrote to the channel directly
assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
- checkEquals(set.asRecords.deepIterator, fileRecords.deepIterator())
+ assertEquals(set.asRecords.deepEntries.asScala.toVector, fileRecords.deepEntries.asScala.toVector)
} finally fileRecords.close()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 5c9f035..e0dfe16 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -171,7 +171,7 @@ class AbstractFetcherThreadTest {
// Now check message's crc
val records = partitionData.toRecords
- for (entry <- records.shallowIterator.asScala) {
+ for (entry <- records.shallowEntries.asScala) {
entry.record.ensureValid()
logEndOffset = entry.nextOffset
}