You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/16 18:56:46 UTC

[1/2] kafka git commit: MINOR: Replace deepIterator/shallowIterator with deepEntries/shallowEntries

Repository: kafka
Updated Branches:
  refs/heads/trunk e55205b81 -> b58b6a1be


http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 4467394..e6260a9 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -165,7 +165,7 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   private def logEntries(partitionData: FetchResponse.PartitionData): Seq[LogEntry] = {
-    partitionData.records.deepIterator.asScala.toIndexedSeq
+    partitionData.records.deepEntries.asScala.toIndexedSeq
   }
 
   private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse,
@@ -183,7 +183,7 @@ class FetchRequestTest extends BaseRequestTest {
       val records = partitionData.records
       responseBufferSize += records.sizeInBytes
 
-      val entries = records.shallowIterator.asScala.toIndexedSeq
+      val entries = records.shallowEntries.asScala.toIndexedSeq
       assertTrue(entries.size < numMessagesPerPartition)
       val entriesSize = entries.map(_.sizeInBytes).sum
       responseSize += entriesSize

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a643f63..50c55b8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -64,10 +64,10 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
 
     assertEquals("But we shouldn't get the second", 0,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   @Test
@@ -89,9 +89,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   @Test
@@ -113,9 +113,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   @Test
@@ -137,10 +137,10 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
 
     assertEquals("But we should get the second too since it's throttled but in sync", 1,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: Record = this.record, bothReplicasInSync: Boolean = false) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 421de32..50a4cd6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -247,7 +247,7 @@ class ReplicaManagerTest {
       
       assertTrue(fetchCallbackFired)
       assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
-      assertTrue("Should return some data", fetchedRecords.shallowIterator.hasNext)
+      assertTrue("Should return some data", fetchedRecords.shallowEntries.iterator.hasNext)
       fetchCallbackFired = false
       
       // Fetch a message above the high watermark as a consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 2f73a94..ff72657 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -154,7 +154,7 @@ class SimpleFetchTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator.next().record)
+        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowEntries.iterator.next().record)
     assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
       replicaManager.readFromLocalLog(
         replicaId = Request.OrdinaryConsumerId,
@@ -163,7 +163,7 @@ class SimpleFetchTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator().next().record)
+        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowEntries().iterator.next().record)
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())


[2/2] kafka git commit: MINOR: Replace deepIterator/shallowIterator with deepEntries/shallowEntries

Posted by jg...@apache.org.
MINOR: Replace deepIterator/shallowIterator with deepEntries/shallowEntries

The latter return `Iterable` instead of `Iterator` so that enhanced foreach can be used
in Java.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2261 from ijuma/deepEntries-shallowEntries


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b58b6a1b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b58b6a1b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b58b6a1b

Branch: refs/heads/trunk
Commit: b58b6a1bef0ecdc2107a415e222af099fcd9bce3
Parents: e55205b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Dec 16 10:41:27 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Dec 16 10:41:27 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  5 +-
 .../kafka/common/record/AbstractRecords.java    | 47 ++++++++++--------
 .../apache/kafka/common/record/FileRecords.java | 52 +++++++++++++-------
 .../kafka/common/record/MemoryRecords.java      | 45 +++++++++++------
 .../org/apache/kafka/common/record/Records.java |  9 ++--
 .../internals/RecordAccumulatorTest.java        |  9 ++--
 .../record/ByteBufferLogInputStreamTest.java    | 10 ++--
 .../kafka/common/record/FileRecordsTest.java    | 22 ++-------
 .../common/record/MemoryRecordsBuilderTest.java | 21 ++------
 .../kafka/common/record/MemoryRecordsTest.java  |  8 +--
 .../java/org/apache/kafka/test/TestUtils.java   |  9 +++-
 .../coordinator/GroupMetadataManager.scala      |  2 +-
 core/src/main/scala/kafka/log/Log.scala         |  6 +--
 core/src/main/scala/kafka/log/LogCleaner.scala  |  4 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  9 ++--
 .../src/main/scala/kafka/log/LogValidator.scala | 10 ++--
 .../kafka/message/ByteBufferMessageSet.scala    |  6 +--
 .../main/scala/kafka/message/MessageSet.scala   |  2 +-
 .../kafka/server/AbstractFetcherThread.scala    |  2 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |  7 ++-
 .../kafka/tools/ReplicaVerificationTool.scala   | 12 ++---
 .../api/GroupCoordinatorIntegrationTest.scala   |  4 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |  2 +-
 .../test/scala/other/kafka/StressTestLog.scala  |  2 +-
 .../message/BaseMessageSetTestCases.scala       |  8 +--
 .../unit/kafka/log/BrokerCompressionTest.scala  |  2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  2 +-
 .../log/LogCleanerLagIntegrationTest.scala      |  2 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 16 +++---
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 24 ++++-----
 .../src/test/scala/unit/kafka/log/LogTest.scala | 35 ++++++-------
 .../scala/unit/kafka/log/LogValidatorTest.scala | 38 +++++++-------
 .../kafka/message/BaseMessageSetTestCases.scala |  5 +-
 .../server/AbstractFetcherThreadTest.scala      |  2 +-
 .../unit/kafka/server/FetchRequestTest.scala    |  4 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala | 16 +++---
 .../unit/kafka/server/ReplicaManagerTest.scala  |  2 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  4 +-
 38 files changed, 241 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3b9d49c..526b0a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,7 +61,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -687,9 +686,7 @@ public class Fetcher<K, V> {
                 }
 
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
-                Iterator<LogEntry> deepIterator = partition.records.deepIterator();
-                while (deepIterator.hasNext()) {
-                    LogEntry logEntry = deepIterator.next();
+                for (LogEntry logEntry : partition.records.deepEntries()) {
                     // Skip the messages earlier than current position.
                     if (logEntry.offset() >= position) {
                         parsed.add(parseRecord(tp, logEntry));

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 3794dc6..3a96d88 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -16,19 +16,37 @@
  **/
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.utils.AbstractIterator;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 public abstract class AbstractRecords implements Records {
 
+    private final Iterable<Record> records = new Iterable<Record>() {
+        @Override
+        public Iterator<Record> iterator() {
+            return new Iterator<Record>() {
+                private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator();
+                @Override
+                public boolean hasNext() {
+                    return deepEntries.hasNext();
+                }
+                @Override
+                public Record next() {
+                    return deepEntries.next().record();
+                }
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException("Removal not supported");
+                }
+            };
+        }
+    };
+
     @Override
     public boolean hasMatchingShallowMagic(byte magic) {
-        Iterator<? extends LogEntry> iterator = shallowIterator();
-        while (iterator.hasNext())
-            if (iterator.next().magic() != magic)
+        for (LogEntry entry : shallowEntries())
+            if (entry.magic() != magic)
                 return false;
         return true;
     }
@@ -39,11 +57,8 @@ public abstract class AbstractRecords implements Records {
     @Override
     public Records toMessageFormat(byte toMagic) {
         List<LogEntry> converted = new ArrayList<>();
-        Iterator<LogEntry> deepIterator = deepIterator();
-        while (deepIterator.hasNext()) {
-            LogEntry entry = deepIterator.next();
+        for (LogEntry entry : deepEntries())
             converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
-        }
 
         if (converted.isEmpty()) {
             // This indicates that the message is too large, which indicates that the buffer is not large
@@ -60,7 +75,7 @@ public abstract class AbstractRecords implements Records {
             // cause some timestamp information to be lost (e.g. if the timestamp type was changed) since
             // we are essentially merging multiple message sets. However, currently this method is only
             // used for down-conversion, so we've ignored the problem.
-            CompressionType compressionType = shallowIterator().next().record().compressionType();
+            CompressionType compressionType = shallowEntries().iterator().next().record().compressionType();
             return MemoryRecords.withLogEntries(compressionType, converted);
         }
     }
@@ -77,16 +92,8 @@ public abstract class AbstractRecords implements Records {
      * Get an iterator over the deep records.
      * @return An iterator over the records
      */
-    public Iterator<Record> records() {
-        return new AbstractIterator<Record>() {
-            private final Iterator<? extends LogEntry> deepEntries = deepIterator();
-            @Override
-            protected Record makeNext() {
-                if (deepEntries.hasNext())
-                    return deepEntries.next().record();
-                return allDone();
-            }
-        };
+    public Iterable<Record> records() {
+        return records;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index faf61e9..52f3103 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -44,6 +44,15 @@ public class FileRecords extends AbstractRecords implements Closeable {
     private volatile File file;
     private final AtomicInteger size;
 
+    private final Iterable<FileChannelLogEntry> shallowEntries;
+
+    private final Iterable<LogEntry> deepEntries = new Iterable<LogEntry>() {
+        @Override
+        public Iterator<LogEntry> iterator() {
+            return deepIterator();
+        }
+    };
+
     public FileRecords(File file,
                        FileChannel channel,
                        int start,
@@ -58,6 +67,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
 
         // set the initial size of the buffer
         resize();
+
+        shallowEntries = shallowEntriesFrom(start);
     }
 
     public void resize() throws IOException {
@@ -246,9 +257,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @param startingPosition The starting position in the file to begin searching from.
      */
     public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
-        Iterator<FileChannelLogEntry> iterator = shallowIteratorFrom(Integer.MAX_VALUE, startingPosition);
-        while (iterator.hasNext()) {
-            FileChannelLogEntry entry = iterator.next();
+        for (FileChannelLogEntry entry : shallowEntriesFrom(startingPosition)) {
             long offset = entry.offset();
             if (offset >= targetOffset)
                 return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
@@ -264,9 +273,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @return The timestamp and offset of the message found. None, if no message is found.
      */
     public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
-        Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
-        while (shallowIterator.hasNext()) {
-            LogEntry shallowEntry = shallowIterator.next();
+        for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
             Record shallowRecord = shallowEntry.record();
             if (shallowRecord.timestamp() >= targetTimestamp) {
                 // We found a message
@@ -292,9 +299,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
         long maxTimestamp = Record.NO_TIMESTAMP;
         long offsetOfMaxTimestamp = -1L;
 
-        Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
-        while (shallowIterator.hasNext()) {
-            LogEntry shallowEntry = shallowIterator.next();
+        for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
             long timestamp = shallowEntry.record().timestamp();
             if (timestamp > maxTimestamp) {
                 maxTimestamp = timestamp;
@@ -311,8 +316,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @return An iterator over the shallow entries
      */
     @Override
-    public Iterator<FileChannelLogEntry> shallowIterator() {
-        return shallowIteratorFrom(start);
+    public Iterable<FileChannelLogEntry> shallowEntries() {
+        return shallowEntries;
     }
 
     /**
@@ -320,15 +325,24 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
      * @return An iterator over the shallow entries
      */
-    public Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize) {
-        return shallowIteratorFrom(maxRecordSize, start);
+    public Iterable<FileChannelLogEntry> shallowEntries(int maxRecordSize) {
+        return shallowEntries(maxRecordSize, start);
     }
 
-    private Iterator<FileChannelLogEntry> shallowIteratorFrom(int start) {
-        return shallowIteratorFrom(Integer.MAX_VALUE, start);
+    private Iterable<FileChannelLogEntry> shallowEntriesFrom(int start) {
+        return shallowEntries(Integer.MAX_VALUE, start);
     }
 
-    private Iterator<FileChannelLogEntry> shallowIteratorFrom(int maxRecordSize, int start) {
+    private Iterable<FileChannelLogEntry> shallowEntries(final int maxRecordSize, final int start) {
+        return new Iterable<FileChannelLogEntry>() {
+            @Override
+            public Iterator<FileChannelLogEntry> iterator() {
+                return shallowIterator(maxRecordSize, start);
+            }
+        };
+    }
+
+    private Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize, int start) {
         final int end;
         if (isSlice)
             end = this.end;
@@ -339,7 +353,11 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     @Override
-    public Iterator<LogEntry> deepIterator() {
+    public Iterable<LogEntry> deepEntries() {
+        return deepEntries;
+    }
+
+    private Iterator<LogEntry> deepIterator() {
         final int end;
         if (isSlice)
             end = this.end;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 0301762..1485486 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -31,8 +31,17 @@ public class MemoryRecords extends AbstractRecords {
 
     public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
 
-    // the underlying buffer used for read; while the records are still writable it is null
-    private ByteBuffer buffer;
+    private final ByteBuffer buffer;
+
+    private final Iterable<ByteBufferLogEntry> shallowEntries = new Iterable<ByteBufferLogEntry>() {
+        @Override
+        public Iterator<ByteBufferLogEntry> iterator() {
+            return shallowIterator();
+        }
+    };
+
+    private final Iterable<LogEntry> deepEntries = deepEntries(false);
+
     private int validBytes = -1;
 
     // Construct a writable memory records
@@ -79,9 +88,8 @@ public class MemoryRecords extends AbstractRecords {
             return validBytes;
 
         int bytes = 0;
-        Iterator<ByteBufferLogEntry> iterator = shallowIterator();
-        while (iterator.hasNext())
-            bytes += iterator.next().sizeInBytes();
+        for (LogEntry entry : shallowEntries())
+            bytes += entry.sizeInBytes();
 
         this.validBytes = bytes;
         return bytes;
@@ -102,9 +110,7 @@ public class MemoryRecords extends AbstractRecords {
         int messagesRetained = 0;
         int bytesRetained = 0;
 
-        Iterator<ByteBufferLogEntry> shallowIterator = shallowIterator();
-        while (shallowIterator.hasNext()) {
-            ByteBufferLogEntry shallowEntry = shallowIterator.next();
+        for (ByteBufferLogEntry shallowEntry : shallowEntries()) {
             bytesRead += shallowEntry.sizeInBytes();
 
             // We use the absolute offset to decide whether to retain the message or not (this is handled by the
@@ -174,27 +180,36 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
-    public Iterator<ByteBufferLogEntry> shallowIterator() {
+    public Iterable<ByteBufferLogEntry> shallowEntries() {
+        return shallowEntries;
+    }
+
+    private Iterator<ByteBufferLogEntry> shallowIterator() {
         return RecordsIterator.shallowIterator(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
     }
 
     @Override
-    public Iterator<LogEntry> deepIterator() {
-        return deepIterator(false);
+    public Iterable<LogEntry> deepEntries() {
+        return deepEntries;
     }
 
-    public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic) {
-        return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
+    public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic) {
+        return new Iterable<LogEntry>() {
+            @Override
+            public Iterator<LogEntry> iterator() {
+                return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
+            }
+        };
     }
 
-    public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
+    private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
         return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
                 ensureMatchingMagic, maxMessageSize);
     }
 
     @Override
     public String toString() {
-        Iterator<LogEntry> iter = deepIterator();
+        Iterator<LogEntry> iter = deepEntries().iterator();
         StringBuilder builder = new StringBuilder();
         builder.append('[');
         while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 823d2b7..f0dbf9e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -18,14 +18,13 @@ package org.apache.kafka.common.record;
 
 import java.io.IOException;
 import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
 
 /**
  * Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
  * Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
  * If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
  * compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
- * over the shallow records, use {@link #shallowIterator()}; for the deep records, use {@link #deepIterator()}. Note
+ * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries()}. Note
  * that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
  * shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
  * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
@@ -61,16 +60,16 @@ public interface Records {
      * record data (see {@link FileLogInputStream.FileChannelLogEntry#magic()}.
      * @return An iterator over the shallow entries of the log
      */
-    Iterator<? extends LogEntry> shallowIterator();
+    Iterable<? extends LogEntry> shallowEntries();
 
     /**
      * Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
      * there are fewer options for optimization since the data must be decompressed before it can be
      * returned. Hence there is little advantage in allowing subclasses to return a more specific type
-     * as we do for {@link #shallowIterator()}.
+     * as we do for {@link #shallowEntries()}.
      * @return An iterator over the deep entries of the log
      */
-    Iterator<LogEntry> deepIterator();
+    Iterable<LogEntry> deepEntries();
 
     /**
      * Check whether all shallow entries in this buffer have a certain magic value.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 4f25bdf..04e1411 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -101,7 +101,7 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records().deepIterator();
+        Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
         for (int i = 0; i < appends; i++) {
             LogEntry entry = iter.next();
             assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -130,7 +130,7 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records().deepIterator();
+        Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
         LogEntry entry = iter.next();
         assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
         assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
@@ -182,11 +182,8 @@ public class RecordAccumulatorTest {
             List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {
-                    Iterator<LogEntry> deepEntries = batch.records().deepIterator();
-                    while (deepEntries.hasNext()) {
-                        deepEntries.next();
+                    for (LogEntry entry : batch.records().deepEntries())
                         read++;
-                    }
                     accum.deallocate(batch);
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
index 62e8a05..c8621cd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
@@ -37,7 +37,7 @@ public class ByteBufferLogInputStreamTest {
         ByteBuffer recordsBuffer = builder.build().buffer();
         recordsBuffer.limit(recordsBuffer.limit() - 5);
 
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowIterator();
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowEntries().iterator();
         assertTrue(iterator.hasNext());
         ByteBufferLogInputStream.ByteBufferLogEntry first = iterator.next();
         assertEquals(0L, first.offset());
@@ -50,7 +50,7 @@ public class ByteBufferLogInputStreamTest {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());
         ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
@@ -67,7 +67,7 @@ public class ByteBufferLogInputStreamTest {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());
         ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
@@ -81,7 +81,7 @@ public class ByteBufferLogInputStreamTest {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());
         ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
@@ -98,7 +98,7 @@ public class ByteBufferLogInputStreamTest {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());
         ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 7e2c256..dcd3bef 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -26,10 +26,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.kafka.test.TestUtils.tempFile;
@@ -85,7 +83,7 @@ public class FileRecordsTest {
         fileRecords.channel().write(buffer);
 
         // appending those bytes should not change the contents
-        TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+        TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
     }
 
     /**
@@ -94,7 +92,7 @@ public class FileRecordsTest {
     @Test
     public void testIterationDoesntChangePosition() throws IOException {
         long position = fileRecords.channel().position();
-        TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+        TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
         assertEquals(position, fileRecords.channel().position());
     }
 
@@ -104,7 +102,7 @@ public class FileRecordsTest {
     @Test
     public void testRead() throws IOException {
         FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
-        TestUtils.checkEquals(fileRecords.shallowIterator(), read.shallowIterator());
+        TestUtils.checkEquals(fileRecords.shallowEntries(), read.shallowEntries());
 
         List<LogEntry> items = shallowEntries(read);
         LogEntry second = items.get(1);
@@ -383,21 +381,11 @@ public class FileRecordsTest {
     }
 
     private static List<LogEntry> shallowEntries(Records buffer) {
-        List<LogEntry> entries = new ArrayList<>();
-        Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
-        while (iterator.hasNext())
-            entries.add(iterator.next());
-        return entries;
+        return TestUtils.toList(buffer.shallowEntries().iterator());
     }
 
     private static List<LogEntry> deepEntries(Records buffer) {
-        List<LogEntry> entries = new ArrayList<>();
-        Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
-        while (iterator.hasNext()) {
-            for (LogEntry deepEntry : iterator.next())
-                entries.add(deepEntry);
-        }
-        return entries;
+        return TestUtils.toList(buffer.deepEntries().iterator());
     }
 
     private FileRecords createFileRecords(Record ... records) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 40fa212..a52976b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -121,9 +120,7 @@ public class MemoryRecordsBuilderTest {
 
         assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
-        Iterator<Record> iterator = records.records();
-        while (iterator.hasNext()) {
-            Record record = iterator.next();
+        for (Record record : records.records()) {
             assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
             assertEquals(logAppendTime, record.timestamp());
         }
@@ -148,9 +145,7 @@ public class MemoryRecordsBuilderTest {
 
         assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
-        Iterator<Record> iterator = records.records();
-        while (iterator.hasNext()) {
-            Record record = iterator.next();
+        for (Record record : records.records()) {
             assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
             assertEquals(logAppendTime, record.timestamp());
         }
@@ -177,11 +172,9 @@ public class MemoryRecordsBuilderTest {
         else
             assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
-        Iterator<Record> iterator = records.records();
         int i = 0;
         long[] expectedTimestamps = new long[] {0L, 2L, 1L};
-        while (iterator.hasNext()) {
-            Record record = iterator.next();
+        for (Record record : records.records()) {
             assertEquals(TimestampType.CREATE_TIME, record.timestampType());
             assertEquals(expectedTimestamps[i++], record.timestamp());
         }
@@ -206,10 +199,8 @@ public class MemoryRecordsBuilderTest {
         assertEquals(2L, info.maxTimestamp);
         assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
-        Iterator<Record> iterator = records.records();
         long i = 0L;
-        while (iterator.hasNext()) {
-            Record record = iterator.next();
+        for (Record record : records.records()) {
             assertEquals(TimestampType.CREATE_TIME, record.timestampType());
             assertEquals(i++, record.timestamp());
         }
@@ -233,9 +224,7 @@ public class MemoryRecordsBuilderTest {
         assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
         assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
 
-        Iterator<Record> iterator = records.records();
-        while (iterator.hasNext()) {
-            Record record = iterator.next();
+        for (Record record : records.records()) {
             assertEquals(TimestampType.CREATE_TIME, record.timestampType());
             assertEquals(Record.NO_TIMESTAMP, record.timestamp());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index ef0fbeb..f2741ee 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -69,7 +69,7 @@ public class MemoryRecordsTest {
 
         for (int iteration = 0; iteration < 2; iteration++) {
             for (MemoryRecords recs : asList(recs1, recs2)) {
-                Iterator<LogEntry> iter = recs.deepIterator();
+                Iterator<LogEntry> iter = recs.deepEntries().iterator();
                 for (int i = 0; i < list.size(); i++) {
                     assertTrue(iter.hasNext());
                     LogEntry entry = iter.next();
@@ -135,7 +135,7 @@ public class MemoryRecordsTest {
 
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
         List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
         assertEquals(expectedOffsets.size(), shallowEntries.size());
 
@@ -148,7 +148,7 @@ public class MemoryRecordsTest {
                     shallowEntry.record().timestampType());
         }
 
-        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepIterator());
+        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries().iterator());
         assertEquals(4, deepEntries.size());
 
         LogEntry first = deepEntries.get(0);
@@ -197,7 +197,7 @@ public class MemoryRecordsTest {
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
         assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
 
         for (LogEntry shallowEntry : shallowEntries) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 4e80b61..428b5a0 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -325,7 +325,14 @@ public class TestUtils {
         assertFalse("Iterators have uneven length--second has more", s2.hasNext());
     }
 
-    public static <T> List<T> toList(Iterator<T> iterator) {
+    /**
+     * Checks the two iterables for equality by first converting both to a list.
+     */
+    public static <T> void checkEquals(Iterable<T> it1, Iterable<T> it2) {
+        assertEquals(toList(it1.iterator()), toList(it2.iterator()));
+    }
+
+    public static <T> List<T> toList(Iterator<? extends T> iterator) {
         List<T> res = new ArrayList<>();
         while (iterator.hasNext())
             res.add(iterator.next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index a97b527..0eb52bb 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -398,7 +398,7 @@ class GroupMetadataManager(val brokerId: Int,
               val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage = true).records.asInstanceOf[FileRecords]
               fileRecords.readInto(buffer, 0)
 
-              MemoryRecords.readableRecords(buffer).deepIterator.asScala.foreach { entry =>
+              MemoryRecords.readableRecords(buffer).deepEntries.asScala.foreach { entry =>
                 val record = entry.record
 
                 require(record.hasKey, "Offset entry key should not be null")

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7a54b77..6cd7953 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -386,7 +386,7 @@ class Log(@volatile var dir: File,
           // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
           // format conversion)
           if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
-            for (logEntry <- validRecords.shallowIterator.asScala) {
+            for (logEntry <- validRecords.shallowEntries.asScala) {
               if (logEntry.sizeInBytes > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
                 // to be consistent with pre-compression bytesRejectedRate recording
@@ -401,7 +401,7 @@ class Log(@volatile var dir: File,
         } else {
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException("Out of order offsets found in " + records.deepIterator.asScala.map(_.offset))
+            throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
         }
 
         // check messages set size may be exceed config.segmentSize
@@ -465,7 +465,7 @@ class Log(@volatile var dir: File,
     var monotonic = true
     var maxTimestamp = Record.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
-    for (entry <- records.shallowIterator.asScala) {
+    for (entry <- records.shallowEntries.asScala) {
       // update the first offset if on the first message
       if(firstOffset < 0)
         firstOffset = entry.offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7676c88..7abd1d8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -477,7 +477,7 @@ private[log] class Cleaner(val id: Int,
       if (writeBuffer.position > 0) {
         writeBuffer.flip()
         val retained = MemoryRecords.readableRecords(writeBuffer)
-        dest.append(firstOffset = retained.deepIterator().next().offset,
+        dest.append(firstOffset = retained.deepEntries.iterator.next().offset,
           largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
@@ -632,7 +632,7 @@ private[log] class Cleaner(val id: Int,
       throttler.maybeThrottle(records.sizeInBytes)
 
       val startPosition = position
-      for (entry <- records.deepIterator.asScala) {
+      for (entry <- records.deepEntries.asScala) {
         val message = entry.record
         if (message.hasKey && entry.offset >= start) {
           if (map.size < maxDesiredMapSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index c08a0bc..eddb47a 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -219,10 +219,9 @@ class LogSegment(val log: FileRecords,
     timeIndex.resize(timeIndex.maxIndexSize)
     var validBytes = 0
     var lastIndexEntry = 0
-    val iter = log.shallowIterator(maxMessageSize)
     maxTimestampSoFar = Record.NO_TIMESTAMP
     try {
-      for (entry <- iter.asScala) {
+      for (entry <- log.shallowEntries(maxMessageSize).asScala) {
         val record = entry.record
         record.ensureValid()
 
@@ -313,7 +312,7 @@ class LogSegment(val log: FileRecords,
     if (ms == null) {
       baseOffset
     } else {
-      ms.records.shallowIterator.asScala.toSeq.lastOption match {
+      ms.records.shallowEntries.asScala.toSeq.lastOption match {
         case None => baseOffset
         case Some(last) => last.nextOffset
       }
@@ -374,9 +373,9 @@ class LogSegment(val log: FileRecords,
   def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
     if (rollingBasedTimestamp.isEmpty) {
-      val iter = log.shallowIterator
+      val iter = log.shallowEntries.iterator()
       if (iter.hasNext)
-        rollingBasedTimestamp = Some(iter.next.record.timestamp)
+        rollingBasedTimestamp = Some(iter.next().record.timestamp)
     }
     rollingBasedTimestamp match {
       case Some(t) if t >= 0 => messageTimestamp - t

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index d9f27e4..1713942 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -78,7 +78,7 @@ private[kafka] object LogValidator {
       val expectedInnerOffset = new LongRef(0)
       val validatedRecords = new mutable.ArrayBuffer[Record]
 
-      records.deepIterator(true).asScala.foreach { logEntry =>
+      records.deepEntries(true).asScala.foreach { logEntry =>
         val record = logEntry.record
         validateKey(record, compactedTopic)
 
@@ -121,7 +121,7 @@ private[kafka] object LogValidator {
         validatedRecords.foreach(_.ensureValid)
 
         // we can update the wrapper message only and write the compressed payload as is
-        val entry = records.shallowIterator.next()
+        val entry = records.shallowEntries.iterator.next()
         val offset = offsetCounter.addAndGet(validatedRecords.size) - 1
         entry.setOffset(offset)
         if (messageTimestampType == TimestampType.CREATE_TIME)
@@ -144,7 +144,7 @@ private[kafka] object LogValidator {
                                                    timestampType: TimestampType,
                                                    messageTimestampDiffMaxMs: Long,
                                                    toMagicValue: Byte): ValidationAndOffsetAssignResult = {
-    val sizeInBytesAfterConversion = records.shallowIterator.asScala.map { logEntry =>
+    val sizeInBytesAfterConversion = records.shallowEntries.asScala.map { logEntry =>
       logEntry.record.convertedSize(toMagicValue)
     }.sum
 
@@ -152,7 +152,7 @@ private[kafka] object LogValidator {
     val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
       offsetCounter.value, now)
 
-    records.shallowIterator.asScala.foreach { logEntry =>
+    records.shallowEntries.asScala.foreach { logEntry =>
       val record = logEntry.record
       validateKey(record, compactedTopic)
       validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs)
@@ -179,7 +179,7 @@ private[kafka] object LogValidator {
     var offsetOfMaxTimestamp = -1L
     val firstOffset = offsetCounter.value
 
-    for (entry <- records.shallowIterator.asScala) {
+    for (entry <- records.shallowEntries.asScala) {
       val record = entry.record
       validateKey(record, compactedTopic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 2c8fef6..198a4c3 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -167,10 +167,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
     val entries = if (isShallow)
-      asRecords.shallowIterator
+      asRecords.shallowEntries
     else
-      asRecords.deepIterator
-    entries.asScala.map(MessageAndOffset.fromLogEntry)
+      asRecords.deepEntries
+    entries.iterator.asScala.map(MessageAndOffset.fromLogEntry)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index a44a362..2fe54cd 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -93,7 +93,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
   override def toString: String = {
     val builder = new StringBuilder()
     builder.append(getClass.getSimpleName + "(")
-    val iter = this.asRecords.shallowIterator()
+    val iter = this.asRecords.shallowEntries.iterator
     var i = 0
     while(iter.hasNext && i < 100) {
       val message = iter.next

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f959ce2..1dbd373 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -145,7 +145,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.NONE =>
                   try {
                     val records = partitionData.toRecords
-                    val newOffset = records.shallowIterator.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse(
+                    val newOffset = records.shallowEntries.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse(
                       currentPartitionFetchState.offset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index ceff78c..8315e64 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -146,7 +146,7 @@ object DumpLogSegments {
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
       val slice = fileRecords.read(entry.position, maxMessageSize)
-      val logEntry = getIterator(slice.shallowIterator.next, isDeepIteration = true).next()
+      val logEntry = getIterator(slice.shallowEntries.iterator.next(), isDeepIteration = true).next()
       if (logEntry.offset != entry.offset + index.baseOffset) {
         var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
         misMatchesSeq ::=(entry.offset + index.baseOffset, logEntry.offset)
@@ -184,7 +184,7 @@ object DumpLogSegments {
       val entry = timeIndex.entry(i)
       val position = index.lookup(entry.offset + timeIndex.baseOffset).position
       val partialFileRecords = fileRecords.read(position, Int.MaxValue)
-      val shallowEntries = partialFileRecords.shallowIterator.asScala
+      val shallowEntries = partialFileRecords.shallowEntries.asScala
       var maxTimestamp = Record.NO_TIMESTAMP
       // We first find the message by offset then check if the timestamp is correct.
       val maybeLogEntry = shallowEntries.find(_.offset >= entry.offset + timeIndex.baseOffset)
@@ -311,8 +311,7 @@ object DumpLogSegments {
     val messageSet = FileRecords.open(file, false)
     var validBytes = 0L
     var lastOffset = -1l
-    val shallowIterator = messageSet.shallowIterator(maxMessageSize).asScala
-    for (shallowLogEntry <- shallowIterator) { // this only does shallow iteration
+    for (shallowLogEntry <- messageSet.shallowEntries(maxMessageSize).asScala) {
       val itr = getIterator(shallowLogEntry, isDeepIteration)
       for (deepLogEntry <- itr) {
         val record = deepLogEntry.record()

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index c483021..98e0414 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -275,9 +275,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
             "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
             + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
-      val logEntryIteratorMap = fetchResponsePerReplica.map {
-        case(replicaId, fetchResponse) =>
-          replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowIterator.asScala
+      val logEntriesMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
+        replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.asScala
       }
       val maxHw = fetchResponsePerReplica.values.map(_.hw).max
 
@@ -285,10 +284,11 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       var isMessageInAllReplicas = true
       while (isMessageInAllReplicas) {
         var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
-        for ( (replicaId, logEntries) <- logEntryIteratorMap) {
+        for ((replicaId, logEntries) <- logEntriesMap) {
           try {
-            if (logEntries.hasNext) {
-              val logEntry = logEntries.next()
+            val logEntriesIterator = logEntries.iterator
+            if (logEntriesIterator.hasNext) {
+              val logEntry = logEntriesIterator.next()
 
               // only verify up to the high watermark
               if (logEntry.offset >= fetchResponsePerReplica.get(replicaId).hw)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 51e987a..ac310a9 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -50,12 +50,12 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
     def getGroupMetadataLogOpt: Option[Log] =
       logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0))
 
-    TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowIterator.asScala.nonEmpty)),
+    TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowEntries.asScala.nonEmpty)),
                             "Commit message not appended in time")
 
     val logSegments = getGroupMetadataLogOpt.get.logSegments
     val incorrectCompressionCodecs = logSegments
-      .flatMap(_.log.shallowIterator.asScala.map(_.record.compressionType.id))
+      .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType.id))
       .filter(_ != offsetsTopicCompressionCodec.codec)
     assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index ecf7408..d837885 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -140,7 +140,7 @@ object TestLogCleaning {
     require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
     for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
       val fileRecords = FileRecords.open(new File(dir, file))
-      for (entry <- fileRecords.shallowIterator.asScala) {
+      for (entry <- fileRecords.shallowEntries.asScala) {
         val key = TestUtils.readString(entry.record.key)
         val content = 
           if(entry.record.hasNullValue)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 3381fb7..a67c166 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -98,7 +98,7 @@ object StressTestLog {
       try {
         log.read(offset, 1024, Some(offset+1)).records match {
           case read: FileRecords if read.sizeInBytes > 0 => {
-            val first = read.shallowIterator.next()
+            val first = read.shallowEntries.iterator.next()
             require(first.offset == offset, "We should either read nothing or the message we asked for.")
             require(first.sizeInBytes == read.sizeInBytes, "Expected %d but got %d.".format(first.sizeInBytes, read.sizeInBytes))
             offset += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index 4a1be11..3327a65 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -20,20 +20,20 @@ package kafka.javaapi.message
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
-import kafka.utils.TestUtils
-import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message}
+import kafka.message.{CompressionCodec, DefaultCompressionCodec, Message, NoCompressionCodec}
+import org.apache.kafka.test.TestUtils
+
 import scala.collection.JavaConverters._
 
 trait BaseMessageSetTestCases extends JUnitSuite {
   
   val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
   def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
-  def toMessageIterator(messageSet: MessageSet): Iterator[Message] = messageSet.asScala.map(m => m.message).iterator
 
   @Test
   def testWrittenEqualsRead {
     val messageSet = createMessageSet(messages)
-    TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
+    assertEquals(messages.toSeq, messageSet.asScala.map(m => m.message))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 296dc15..6a165ed 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -59,7 +59,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
       Record.create("hello".getBytes), Record.create("there".getBytes)))
 
-    def readMessage(offset: Int) = log.read(offset, 4096).records.shallowIterator.next().record
+    def readMessage(offset: Int) = log.read(offset, 4096).records.shallowEntries.iterator.next().record
 
     if (!brokerCompression.equals("producer")) {
       val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 65c2d05..43c41f3 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -250,7 +250,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
   private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
     import JavaConverters._
-    for (segment <- log.logSegments; deepLogEntry <- segment.log.deepIterator.asScala) yield {
+    for (segment <- log.logSegments; deepLogEntry <- segment.log.deepEntries.asScala) yield {
       val key = TestUtils.readString(deepLogEntry.record.key).toInt
       val value = TestUtils.readString(deepLogEntry.record.value)
       (key, value, deepLogEntry.offset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index abab3bf..1231e98 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -106,7 +106,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   private def readFromLog(log: Log): Iterable[(Int, Int)] = {
     import JavaConverters._
 
-    for (segment <- log.logSegments; logEntry <- segment.log.deepIterator.asScala) yield {
+    for (segment <- log.logSegments; logEntry <- segment.log.deepEntries.asScala) yield {
       val key = TestUtils.readString(logEntry.record.key).toInt
       val value = TestUtils.readString(logEntry.record.value).toInt
       key -> value

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ae8e401..f43e92b 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -229,7 +229,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // the last (active) segment has just one message
 
-    def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowIterator.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
+    def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowEntries.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
 
     val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
     assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
@@ -324,14 +324,14 @@ class LogCleanerTest extends JUnitSuite {
 
   /* extract all the keys from a log */
   def keysInLog(log: Log): Iterable[Int] =
-    log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
+    log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
 
   /* extract all the offsets from a log */
   def offsetsInLog(log: Log): Iterable[Long] =
-    log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
+    log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
 
   def unkeyedMessageCountInLog(log: Log) =
-    log.logSegments.map(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
+    log.logSegments.map(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
 
   def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
     throw new LogCleaningAbortedException()
@@ -679,7 +679,7 @@ class LogCleanerTest extends JUnitSuite {
 
     cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
 
-    for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowIterator.asScala; deepLogEntry <- shallowLogEntry.asScala) {
+    for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowEntries.asScala; deepLogEntry <- shallowLogEntry.asScala) {
       assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic)
       val value = TestUtils.readString(deepLogEntry.record.value).toLong
       assertEquals(deepLogEntry.offset, value)
@@ -701,7 +701,7 @@ class LogCleanerTest extends JUnitSuite {
     val corruptedMessage = invalidCleanedMessage(offset, set)
     val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
 
-    for (logEntry <- records.deepIterator.asScala) {
+    for (logEntry <- records.deepEntries.asScala) {
       val offset = logEntry.offset
       val value = TestUtils.readString(logEntry.record.value).toLong
       assertEquals(offset, value)
@@ -727,14 +727,14 @@ class LogCleanerTest extends JUnitSuite {
                                           timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
     log.roll()
     cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, log.activeSegment.baseOffset))
-    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
     // Append a message and roll out another log segment.
     log.append(TestUtils.singletonRecords(value = "1".getBytes,
                                           key = "1".getBytes,
                                           timestamp = time.milliseconds()))
     log.roll()
     cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
-    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
   }
 
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 1c747ec..1197b02 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -77,7 +77,7 @@ class LogSegmentTest {
     val ms = records(50, "hello", "there", "little", "bee")
     seg.append(50, 53, Record.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
-    assertEquals(ms.deepIterator.asScala.toList, read.deepIterator.asScala.toList)
+    assertEquals(ms.deepEntries.asScala.toList, read.deepEntries.asScala.toList)
   }
 
   /**
@@ -91,8 +91,8 @@ class LogSegmentTest {
     val ms = records(baseOffset, "hello", "there", "beautiful")
     seg.append(baseOffset, 52, Record.NO_TIMESTAMP, -1L, ms)
     def validate(offset: Long) =
-      assertEquals(ms.deepIterator.asScala.filter(_.offset == offset).toList,
-                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepIterator.asScala.toList)
+      assertEquals(ms.deepEntries.asScala.filter(_.offset == offset).toList,
+                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepEntries.asScala.toList)
     validate(50)
     validate(51)
     validate(52)
@@ -122,7 +122,7 @@ class LogSegmentTest {
     val ms2 = records(60, "alpha", "beta")
     seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
+    assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
   }
 
   /**
@@ -140,12 +140,12 @@ class LogSegmentTest {
       seg.append(offset + 1, offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
-      assertEquals(List(ms1.deepIterator.next(), ms2.deepIterator.next()), read.records.deepIterator.asScala.toList)
+      assertEquals(List(ms1.deepEntries.iterator.next(), ms2.deepEntries.iterator.next()), read.records.deepEntries.asScala.toList)
       // now truncate off the last message
       seg.truncateTo(offset + 1)
       val read2 = seg.read(offset, None, 10000)
-      assertEquals(1, read2.records.deepIterator.asScala.size)
-      assertEquals(ms1.deepIterator.next(), read2.records.deepIterator.next())
+      assertEquals(1, read2.records.deepEntries.asScala.size)
+      assertEquals(ms1.deepEntries.iterator.next(), read2.records.deepEntries.iterator.next())
       offset += 1
     }
   }
@@ -246,7 +246,7 @@ class LogSegmentTest {
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
-      assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepIterator.next().offset)
+      assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepEntries.iterator.next().offset)
   }
 
   /**
@@ -285,7 +285,7 @@ class LogSegmentTest {
       val position = recordPosition.position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
       seg.recover(64*1024)
-      assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowIterator.asScala.map(_.offset).toList)
+      assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowEntries.asScala.map(_.offset).toList)
       seg.delete()
     }
   }
@@ -307,7 +307,7 @@ class LogSegmentTest {
     val ms2 = records(60, "alpha", "beta")
     seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
+    assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
   }
 
   /* create a segment with   pre allocate and clearly shut down*/
@@ -321,7 +321,7 @@ class LogSegmentTest {
     val ms2 = records(60, "alpha", "beta")
     seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
+    assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
     val oldSize = seg.log.sizeInBytes()
     val oldPosition = seg.log.channel.position
     val oldFileSize = seg.log.file.length
@@ -334,7 +334,7 @@ class LogSegmentTest {
     segments += segReopen
 
     val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepIterator.asScala.toList, readAgain.records.deepIterator.asScala.toList)
+    assertEquals(ms2.deepEntries.asScala.toList, readAgain.records.deepEntries.asScala.toList)
     val size = segReopen.log.sizeInBytes()
     val position = segReopen.log.channel.position
     val fileSize = segReopen.log.file.length

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 49381a4..ff596bd 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -196,11 +196,11 @@ class LogTest extends JUnitSuite {
       log.append(MemoryRecords.withRecords(records(i)))
 
     for(i <- records.indices) {
-      val read = log.read(i, 100, Some(i+1)).records.shallowIterator.next()
+      val read = log.read(i, 100, Some(i+1)).records.shallowEntries.iterator.next()
       assertEquals("Offset read should match order appended.", i, read.offset)
       assertEquals("Message should match appended.", records(i), read.record)
     }
-    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowIterator.asScala.size)
+    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowEntries.asScala.size)
   }
 
   /**
@@ -220,7 +220,7 @@ class LogTest extends JUnitSuite {
       log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
     for(i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
-      val read = log.read(i, 100, None).records.shallowIterator.next()
+      val read = log.read(i, 100, None).records.shallowEntries.iterator.next()
       assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
       assertEquals("Message should match appended.", records(idx), read.record)
     }
@@ -245,7 +245,8 @@ class LogTest extends JUnitSuite {
     // now manually truncate off all but one message from the first segment to create a gap in the messages
     log.logSegments.head.truncateTo(1)
 
-    assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).records.shallowIterator.next().offset)
+    assertEquals("A read should now return the last message in the log", log.logEndOffset - 1,
+      log.read(1, 200, None).records.shallowEntries.iterator.next().offset)
   }
 
   @Test
@@ -266,13 +267,13 @@ class LogTest extends JUnitSuite {
         log.read(i, 1, minOneMessage = true),
         log.read(i, 100, minOneMessage = true),
         log.read(i, 100, Some(10000), minOneMessage = true)
-      ).map(_.records.shallowIterator.next())
+      ).map(_.records.shallowEntries.iterator.next())
       reads.foreach { read =>
         assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
         assertEquals("Message should match appended.", records(idx), read.record)
       }
 
-      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowIterator.asScala.toIndexedSeq)
+      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowEntries.asScala.toIndexedSeq)
     }
 
   }
@@ -357,15 +358,15 @@ class LogTest extends JUnitSuite {
     /* do successive reads to ensure all our messages are there */
     var offset = 0L
     for(i <- 0 until numMessages) {
-      val messages = log.read(offset, 1024*1024).records.shallowIterator
-      val head = messages.next()
+      val messages = log.read(offset, 1024*1024).records.shallowEntries
+      val head = messages.iterator.next()
       assertEquals("Offsets not equal", offset, head.offset)
-      assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowIterator.next().record,
-        head.record.convert(messageSets(i).shallowIterator.next().record.magic))
+      assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowEntries.iterator.next().record,
+        head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic))
       offset = head.offset + 1
     }
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records
-    assertEquals("Should be no more messages", 0, lastRead.shallowIterator.asScala.size)
+    assertEquals("Should be no more messages", 0, lastRead.shallowEntries.asScala.size)
 
     // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
     TestUtils.retry(1000L){
@@ -387,7 +388,7 @@ class LogTest extends JUnitSuite {
     log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("hello".getBytes), Record.create("there".getBytes)))
     log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("alpha".getBytes), Record.create("beta".getBytes)))
 
-    def read(offset: Int) = log.read(offset, 4096).records.deepIterator
+    def read(offset: Int) = log.read(offset, 4096).records.deepEntries.iterator
 
     /* we should always get the first message in the compressed set when reading any offset in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
@@ -624,7 +625,7 @@ class LogTest extends JUnitSuite {
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset)
+      assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -700,7 +701,7 @@ class LogTest extends JUnitSuite {
     log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset)
+      assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -959,7 +960,7 @@ class LogTest extends JUnitSuite {
                       time.scheduler,
                       time)
     log.append(MemoryRecords.withRecords(Record.create(null)))
-    val head = log.read(0, 4096, None).records.shallowIterator().next()
+    val head = log.read(0, 4096, None).records.shallowEntries().iterator.next()
     assertEquals(0, head.offset)
     assertTrue("Message payload should be null.", head.record.hasNullValue)
   }
@@ -998,7 +999,7 @@ class LogTest extends JUnitSuite {
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.append(set)
-      val messages = log.logSegments.flatMap(_.log.deepIterator.asScala.toList)
+      val messages = log.logSegments.flatMap(_.log.deepEntries.asScala.toList)
       log.close()
 
       // corrupt index and log by appending random bytes
@@ -1009,7 +1010,7 @@ class LogTest extends JUnitSuite {
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)
       assertEquals("Messages in the log after recovery should be the same.", messages,
-        log.logSegments.flatMap(_.log.deepIterator.asScala.toList))
+        log.logSegments.flatMap(_.log.deepEntries.asScala.toList))
       Utils.delete(logDir)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 72c5b16..b201feb 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -44,8 +44,8 @@ class LogValidatorTest extends JUnitSuite {
       messageTimestampType = TimestampType.LOG_APPEND_TIME,
       messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
-    assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size)
-    validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+    assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
+    validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
     assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
@@ -67,12 +67,12 @@ class LogValidatorTest extends JUnitSuite {
       messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
 
-    assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size)
-    validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
-    assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid)
+    assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
+    validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+    assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}",
-      records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
+      records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
@@ -93,13 +93,13 @@ class LogValidatorTest extends JUnitSuite {
       messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
 
-    assertEquals("number of messages should not change", records.deepIterator.asScala.size,
-      validatedRecords.deepIterator.asScala.size)
-    validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
-    assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid)
+    assertEquals("number of messages should not change", records.deepEntries.asScala.size,
+      validatedRecords.deepEntries.asScala.size)
+    validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+    assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}",
-      records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
+      records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
@@ -124,7 +124,7 @@ class LogValidatorTest extends JUnitSuite {
     val validatedRecords = validatingResults.validatedRecords
 
     var i = 0
-    for (logEntry <- validatedRecords.deepIterator.asScala) {
+    for (logEntry <- validatedRecords.deepEntries.asScala) {
       logEntry.record.ensureValid()
       assertEquals(logEntry.record.timestamp, timestampSeq(i))
       assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
@@ -157,15 +157,15 @@ class LogValidatorTest extends JUnitSuite {
     val validatedRecords = validatedResults.validatedRecords
 
     var i = 0
-    for (logEntry <- validatedRecords.deepIterator.asScala) {
+    for (logEntry <- validatedRecords.deepEntries.asScala) {
       logEntry.record.ensureValid()
       assertEquals(logEntry.record.timestamp, timestampSeq(i))
       assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
       i += 1
     }
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp)
-    assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepIterator.asScala.size - 1}",
-      validatedRecords.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size - 1}",
+      validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
@@ -358,9 +358,9 @@ class LogValidatorTest extends JUnitSuite {
 
   /* check that offsets are assigned consecutively from the given base offset */
   private def checkOffsets(records: MemoryRecords, baseOffset: Long) {
-    assertTrue("Message set should not be empty", records.deepIterator.asScala.nonEmpty)
+    assertTrue("Message set should not be empty", records.deepEntries.asScala.nonEmpty)
     var offset = baseOffset
-    for (entry <- records.deepIterator.asScala) {
+    for (entry <- records.deepEntries.asScala) {
       assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
       offset += 1
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index bd3ed68..ef2b0af 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -27,6 +27,7 @@ import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 
 trait BaseMessageSetTestCases extends JUnitSuite {
 
@@ -61,7 +62,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   @Test
   def testWrittenEqualsRead() {
     val messageSet = createMessageSet(messages)
-    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+    assertEquals(messages.toVector, messageSet.toVector.map(m => m.message))
   }
 
   @Test
@@ -123,7 +124,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
         fileRecords.resize() // resize since we wrote to the channel directly
 
         assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
-        checkEquals(set.asRecords.deepIterator, fileRecords.deepIterator())
+        assertEquals(set.asRecords.deepEntries.asScala.toVector, fileRecords.deepEntries.asScala.toVector)
       } finally fileRecords.close()
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b58b6a1b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 5c9f035..e0dfe16 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -171,7 +171,7 @@ class AbstractFetcherThreadTest {
 
       // Now check message's crc
       val records = partitionData.toRecords
-      for (entry <- records.shallowIterator.asScala) {
+      for (entry <- records.shallowEntries.asScala) {
         entry.record.ensureValid()
         logEndOffset = entry.nextOffset
       }