You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/30 21:39:32 UTC

kafka git commit: MINOR: Ensure streaming iterator is closed by Fetcher

Repository: kafka
Updated Branches:
  refs/heads/trunk f9772d5fb -> dd71e4a8d


MINOR: Ensure streaming iterator is closed by Fetcher

Author: Jason Gustafson <ja...@confluent.io>
Author: Ismael Juma <gi...@juma.me.uk>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2762 from hachikuji/ensure-decompression-stream-closed


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd71e4a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd71e4a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd71e4a8

Branch: refs/heads/trunk
Commit: dd71e4a8d830c9de40b5ec3f987f60a1d2f26b39
Parents: f9772d5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Mar 30 22:39:28 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Mar 30 22:39:28 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   1 +
 .../clients/consumer/internals/Fetcher.java     |  40 +-
 .../record/AbstractLegacyRecordBatch.java       |  46 +-
 .../kafka/common/record/DefaultRecord.java      |  41 ++
 .../kafka/common/record/DefaultRecordBatch.java |  45 +-
 .../kafka/common/record/FileLogInputStream.java |   7 +
 .../kafka/common/record/LegacyRecord.java       |   4 +-
 .../apache/kafka/common/record/RecordBatch.java |  11 +
 .../kafka/common/record/SimpleRecord.java       |   8 +
 .../kafka/common/utils/CloseableIterator.java   |  30 ++
 .../clients/consumer/internals/FetcherTest.java | 446 ++++++++++++-------
 .../common/record/DefaultRecordBatchTest.java   |  14 +
 .../java/org/apache/kafka/test/TestUtils.java   |   5 +
 13 files changed, 516 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a666540..15434bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1567,6 +1567,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             firstException.compareAndSet(null, t);
             log.error("Failed to close coordinator", t);
         }
+        ClientUtils.closeQuietly(fetcher, "fetcher", firstException);
         ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
         ClientUtils.closeQuietly(client, "consumer network client", firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ad63d25..2eeef11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -45,8 +45,8 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
@@ -55,11 +55,13 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -78,7 +80,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * This class manage the fetching process with the brokers.
  */
-public class Fetcher<K, V> implements SubscriptionState.Listener {
+public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
 
@@ -748,7 +750,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
         TopicPartition tp = completedFetch.partition;
         FetchResponse.PartitionData partition = completedFetch.partitionData;
         long fetchOffset = completedFetch.fetchedOffset;
-        PartitionRecords parsedRecords = null;
+        PartitionRecords partitionRecords = null;
         Errors error = partition.error;
 
         try {
@@ -769,7 +771,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
                         partition.records.sizeInBytes(), tp, position);
                 Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
-                parsedRecords = new PartitionRecords(tp, completedFetch, batches);
+                partitionRecords = new PartitionRecords(tp, completedFetch, batches);
 
                 if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
                     if (completedFetch.responseVersion < 3) {
@@ -819,15 +821,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
             }
         } finally {
-            if (error != Errors.NONE) {
+            if (partitionRecords == null)
                 completedFetch.metricAggregator.record(tp, 0, 0);
+
+            if (error != Errors.NONE)
                 // we move the partition to the end if there was an error. This way, it's more likely that partitions for
                 // the same topic can remain together (allowing for more efficient serialization).
                 subscriptions.movePartitionToEnd(tp);
-            }
         }
 
-        return parsedRecords;
+        return partitionRecords;
     }
 
     private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
@@ -866,7 +869,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
         private int recordsRead;
         private int bytesRead;
         private RecordBatch currentBatch;
-        private Iterator<Record> records;
+        private CloseableIterator<Record> records;
         private long nextFetchOffset;
         private boolean isFetched = false;
 
@@ -881,6 +884,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
 
         private void drain() {
             if (!isFetched) {
+                maybeCloseRecordStream();
+
                 this.isFetched = true;
                 this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
 
@@ -913,16 +918,23 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
             }
         }
 
+        private void maybeCloseRecordStream() {
+            if (records != null)
+                records.close();
+        }
+
         private Record nextFetchedRecord() {
             while (true) {
                 if (records == null || !records.hasNext()) {
+                    maybeCloseRecordStream();
+
                     if (!batches.hasNext()) {
                         drain();
                         return null;
                     }
                     currentBatch = batches.next();
                     maybeEnsureValid(currentBatch);
-                    records = currentBatch.iterator();
+                    records = currentBatch.streamingIterator();
                 }
 
                 Record record = records.next();
@@ -939,12 +951,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
             }
         }
 
-        private List<ConsumerRecord<K, V>> fetchRecords(int n) {
+        private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
             if (isFetched)
                 return Collections.emptyList();
 
             List<ConsumerRecord<K, V>> records = new ArrayList<>();
-            for (int i = 0; i < n; i++) {
+            for (int i = 0; i < maxRecords; i++) {
                 Record record = nextFetchedRecord();
                 if (record == null)
                     break;
@@ -1173,4 +1185,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
         }
     }
 
+    @Override
+    public void close() {
+        if (nextInLineRecords != null)
+            nextInLineRecords.drain();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 6deeb52..1b74a7d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.DataInputStream;
@@ -29,8 +30,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
@@ -155,7 +155,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
 
     @Override
     public String toString() {
-        return "LegacyRecordBatch(" + offset() + ", " + outerRecord() + ")";
+        return "LegacyRecordBatch(offset=" + offset() + ", " + outerRecord() + ")";
     }
 
     @Override
@@ -211,11 +211,40 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
      * @return An iterator over the records contained within this batch
      */
     @Override
-    public Iterator<Record> iterator() {
+    public CloseableIterator<Record> iterator() {
         if (isCompressed())
             return new DeepRecordsIterator(this, false, Integer.MAX_VALUE);
-        else
-            return Collections.<Record>singletonList(this).iterator();
+
+        return new CloseableIterator<Record>() {
+            private boolean hasNext = true;
+
+            @Override
+            public void close() {}
+
+            @Override
+            public boolean hasNext() {
+                return hasNext;
+            }
+
+            @Override
+            public Record next() {
+                if (!hasNext)
+                    throw new NoSuchElementException();
+                hasNext = false;
+                return AbstractLegacyRecordBatch.this;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIterator<Record> streamingIterator() {
+        // the older message format versions do not support streaming, so we return the normal iterator
+        return iterator();
     }
 
     static void writeHeader(ByteBuffer buffer, long offset, int size) {
@@ -256,7 +285,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
         }
     }
 
-    private static class DeepRecordsIterator extends AbstractIterator<Record> {
+    private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
         private final ArrayDeque<AbstractLegacyRecordBatch> batches;
         private final long absoluteBaseOffset;
         private final byte wrapperMagic;
@@ -341,6 +370,9 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
 
             return entry;
         }
+
+        @Override
+        public void close() {}
     }
 
     private static class BasicLegacyRecordBatch extends AbstractLegacyRecordBatch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 7c8dee5..a4b1d11 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -26,6 +26,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.zip.Checksum;
 
 import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
@@ -292,6 +293,46 @@ public class DefaultRecord implements Record {
         return (attributes & CONTROL_FLAG_MASK) != 0;
     }
 
+    @Override
+    public String toString() {
+        return String.format("DefaultRecord(offset=%d, timestamp=%d, key=%d bytes, value=%d bytes)",
+                offset,
+                timestamp,
+                key == null ? 0 : key.limit(),
+                value == null ? 0 : value.limit());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        DefaultRecord that = (DefaultRecord) o;
+        return sizeInBytes == that.sizeInBytes &&
+                attributes == that.attributes &&
+                offset == that.offset &&
+                timestamp == that.timestamp &&
+                sequence == that.sequence &&
+                (key == null ? that.key == null : key.equals(that.key)) &&
+                (value == null ? that.value == null : value.equals(that.value)) &&
+                Arrays.equals(headers, that.headers);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = sizeInBytes;
+        result = 31 * result + (int) attributes;
+        result = 31 * result + (int) (offset ^ (offset >>> 32));
+        result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+        result = 31 * result + sequence;
+        result = 31 * result + (key != null ? key.hashCode() : 0);
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + Arrays.hashCode(headers);
+        return result;
+    }
+
     public static DefaultRecord readFrom(DataInputStream input,
                                          long baseOffset,
                                          long baseTimestamp,

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 665ddd1..3eeea36 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -19,12 +19,16 @@ package org.apache.kafka.common.record;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Crc32C;
 
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 
@@ -199,7 +203,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
     }
 
-    private Iterator<Record> compressedIterator() {
+    private CloseableIterator<Record> compressedIterator() {
         ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
         final DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
@@ -214,10 +218,19 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
                     throw new KafkaException("Failed to decompress record stream", e);
                 }
             }
+
+            @Override
+            public void close() {
+                try {
+                    stream.close();
+                } catch (IOException e) {
+                    throw new KafkaException("Failed to close record stream", e);
+                }
+            }
         };
     }
 
-    private Iterator<Record> uncompressedIterator() {
+    private CloseableIterator<Record> uncompressedIterator() {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
         return new RecordIterator() {
@@ -225,11 +238,30 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
             protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
                 return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
             }
+            @Override
+            public void close() {}
         };
     }
 
     @Override
     public Iterator<Record> iterator() {
+        if (!isCompressed())
+            return uncompressedIterator();
+
+        // for a normal iterator, we cannot ensure that the underlying compression stream is closed,
+        // so we decompress the full record set here. Use cases which call for a lower memory footprint
+        // can use `streamingIterator` at the cost of additional complexity
+        try (CloseableIterator<Record> iterator = compressedIterator()) {
+            List<Record> records = new ArrayList<>(count());
+            while (iterator.hasNext())
+                records.add(iterator.next());
+            return records.iterator();
+        }
+    }
+
+
+    @Override
+    public CloseableIterator<Record> streamingIterator() {
         if (isCompressed())
             return compressedIterator();
         else
@@ -348,7 +380,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
 
     @Override
     public String toString() {
-        return "RecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+        return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " +
+                "compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")";
     }
 
     public static int sizeInBytes(long baseOffset, Iterable<Record> records) {
@@ -396,7 +429,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
     }
 
-    private abstract class RecordIterator implements Iterator<Record> {
+    private abstract class RecordIterator implements CloseableIterator<Record> {
         private final Long logAppendTime;
         private final long baseOffset;
         private final long baseTimestamp;
@@ -423,6 +456,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
 
         @Override
         public Record next() {
+            if (readRecords >= numRecords)
+                throw new NoSuchElementException();
+
             readRecords++;
             return readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
         }
@@ -433,5 +469,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         public void remove() {
             throw new UnsupportedOperationException();
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 59055ed..d5f10dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.IOException;
@@ -224,6 +225,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         }
 
         @Override
+        public CloseableIterator<Record> streamingIterator() {
+            loadUnderlyingRecordBatch();
+            return underlying.streamingIterator();
+        }
+
+        @Override
         public boolean isValid() {
             loadUnderlyingRecordBatch();
             return underlying.isValid();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
index 69ee4c3..25185b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
@@ -279,7 +279,7 @@ public final class LegacyRecord {
 
     public String toString() {
         if (magic() > 0)
-            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, %s = %d, key = %d bytes, value = %d bytes)",
+            return String.format("Record(magic=%d, attributes=%d, compression=%s, crc=%d, %s=%d, key=%d bytes, value=%d bytes)",
                                  magic(),
                                  attributes(),
                                  compressionType(),
@@ -289,7 +289,7 @@ public final class LegacyRecord {
                                  key() == null ? 0 : key().limit(),
                                  value() == null ? 0 : value().limit());
         else
-            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
+            return String.format("Record(magic=%d, attributes=%d, compression=%s, crc=%d, key=%d bytes, value=%d bytes)",
                                  magic(),
                                  attributes(),
                                  compressionType(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 1cfb7f8..90f1486 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -16,7 +16,10 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.CloseableIterator;
+
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 /**
  * A record batch is a container for records. In old versions of the record format (versions 0 and 1),
@@ -200,4 +203,12 @@ public interface RecordBatch extends Iterable<Record> {
      */
     int partitionLeaderEpoch();
 
+    /**
+     * Return a streaming iterator which basically delays decompression of the record stream until the records
+     * are actually asked for using {@link Iterator#next()}. If the message format does not support streaming
+     * iteration, then the normal iterator is returned. Either way, callers should ensure that the iterator is closed.
+     *
+     * @return The closeable iterator
+     */
+    CloseableIterator<Record> streamingIterator();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
index 3a1c04c..0a5cbcf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
@@ -106,4 +106,12 @@ public class SimpleRecord {
         result = 31 * result + Arrays.hashCode(headers);
         return result;
     }
+
+    @Override
+    public String toString() {
+        return String.format("SimpleRecord(timestamp=%d, key=%d bytes, value=%d bytes)",
+                timestamp(),
+                key == null ? 0 : key.limit(),
+                value == null ? 0 : value.limit());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
new file mode 100644
index 0000000..38fba8e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+/**
+ * Iterators that need to be closed in order to release resources should implement this interface.
+ *
+ * Warning: before implementing this interface, consider if there are better options. The chance of misuse is
+ * a bit high since people are used to iterating without closing.
+ */
+public interface CloseableIterator<T> extends Iterator<T>, Closeable {
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b03b461..092f549 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
@@ -60,6 +61,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -88,7 +90,8 @@ public class FetcherTest {
     private String topicName = "test";
     private String groupId = "test-group";
     private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
-    private TopicPartition tp = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 0);
+    private TopicPartition tp2 = new TopicPartition(topicName, 1);
     private int minBytes = 1;
     private int maxBytes = Integer.MAX_VALUE;
     private int maxWaitMs = 0;
@@ -97,7 +100,7 @@ public class FetcherTest {
     private MockTime time = new MockTime(1);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private MockClient client = new MockClient(time, metadata);
-    private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
     private Node node = cluster.nodes().get(0);
     private Metrics metrics = new Metrics(time);
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
@@ -132,12 +135,14 @@ public class FetcherTest {
     public void teardown() {
         this.metrics.close();
         this.fetcherMetrics.close();
+        this.fetcher.close();
+        this.fetcherMetrics.close();
     }
 
     @Test
     public void testFetchNormal() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -148,11 +153,11 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
-        assertTrue(partitionRecords.containsKey(tp));
+        assertTrue(partitionRecords.containsKey(tp1));
 
-        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
+        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
         assertEquals(3, records.size());
-        assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching position
+        assertEquals(4L, subscriptions.position(tp1).longValue()); // this is the next fetching position
         long offset = 1;
         for (ConsumerRecord<byte[], byte[]> record : records) {
             assertEquals(offset, record.offset());
@@ -162,8 +167,8 @@ public class FetcherTest {
 
     @Test
     public void testFetcherIgnoresControlRecords() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -187,19 +192,19 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
-        assertTrue(partitionRecords.containsKey(tp));
+        assertTrue(partitionRecords.containsKey(tp1));
 
-        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
+        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
         assertEquals(2, records.size());
-        assertEquals(4L, subscriptions.position(tp).longValue());
+        assertEquals(4L, subscriptions.position(tp1).longValue());
         for (ConsumerRecord<byte[], byte[]> record : records)
             assertArrayEquals("key".getBytes(), record.key());
     }
 
     @Test
     public void testFetchError() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
@@ -209,7 +214,7 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
-        assertFalse(partitionRecords.containsKey(tp));
+        assertFalse(partitionRecords.containsKey(tp1));
     }
 
     private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
@@ -239,10 +244,10 @@ public class FetcherTest {
 
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
 
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 1);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 1);
 
-        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -251,7 +256,7 @@ public class FetcherTest {
             fail("fetchedRecords should have raised");
         } catch (SerializationException e) {
             // the position should not advance since no data has been returned
-            assertEquals(1, subscriptions.position(tp).longValue());
+            assertEquals(1, subscriptions.position(tp1).longValue());
         }
     }
 
@@ -282,8 +287,8 @@ public class FetcherTest {
 
         buffer.flip();
 
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -294,7 +299,7 @@ public class FetcherTest {
             fail("fetchedRecords should have raised");
         } catch (KafkaException e) {
             // the position should not advance since no data has been returned
-            assertEquals(0, subscriptions.position(tp).longValue());
+            assertEquals(0, subscriptions.position(tp1).longValue());
         }
     }
 
@@ -310,8 +315,8 @@ public class FetcherTest {
         // flip some bits to fail the crc
         buffer.putInt(32, buffer.get(32) ^ 87238423);
 
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -322,7 +327,7 @@ public class FetcherTest {
             fail("fetchedRecords should have raised");
         } catch (KafkaException e) {
             // the position should not advance since no data has been returned
-            assertEquals(0, subscriptions.position(tp).longValue());
+            assertEquals(0, subscriptions.position(tp1).longValue());
         }
     }
 
@@ -331,32 +336,32 @@ public class FetcherTest {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
 
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 1);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 1);
 
-        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
-        client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp);
+        records = fetcher.fetchedRecords().get(tp1);
         assertEquals(2, records.size());
-        assertEquals(3L, subscriptions.position(tp).longValue());
+        assertEquals(3L, subscriptions.position(tp1).longValue());
         assertEquals(1, records.get(0).offset());
         assertEquals(2, records.get(1).offset());
 
         assertEquals(0, fetcher.sendFetches());
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp);
+        records = fetcher.fetchedRecords().get(tp1);
         assertEquals(1, records.size());
-        assertEquals(4L, subscriptions.position(tp).longValue());
+        assertEquals(4L, subscriptions.position(tp1).longValue());
         assertEquals(3, records.get(0).offset());
 
         assertTrue(fetcher.sendFetches() > 0);
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp);
+        records = fetcher.fetchedRecords().get(tp1);
         assertEquals(2, records.size());
-        assertEquals(6L, subscriptions.position(tp).longValue());
+        assertEquals(6L, subscriptions.position(tp1).longValue());
         assertEquals(4, records.get(0).offset());
         assertEquals(5, records.get(1).offset());
     }
@@ -374,16 +379,16 @@ public class FetcherTest {
         MemoryRecords records = builder.build();
 
         List<ConsumerRecord<byte[], byte[]>> consumerRecords;
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
-        consumerRecords = fetcher.fetchedRecords().get(tp);
+        consumerRecords = fetcher.fetchedRecords().get(tp1);
         assertEquals(3, consumerRecords.size());
-        assertEquals(31L, subscriptions.position(tp).longValue()); // this is the next fetching position
+        assertEquals(31L, subscriptions.position(tp1).longValue()); // this is the next fetching position
 
         assertEquals(15L, consumerRecords.get(0).offset());
         assertEquals(20L, consumerRecords.get(1).offset());
@@ -406,7 +411,7 @@ public class FetcherTest {
             } catch (RecordTooLargeException e) {
                 assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                 // the position should not advance since no data has been returned
-                assertEquals(0, subscriptions.position(tp).longValue());
+                assertEquals(0, subscriptions.position(tp1).longValue());
             }
         } finally {
             client.setNodeApiVersions(NodeApiVersions.create());
@@ -428,13 +433,13 @@ public class FetcherTest {
         } catch (KafkaException e) {
             assertTrue(e.getMessage().startsWith("Failed to make progress reading messages"));
             // the position should not advance since no data has been returned
-            assertEquals(0, subscriptions.position(tp).longValue());
+            assertEquals(0, subscriptions.position(tp1).longValue());
         }
     }
 
     private void makeFetchRequestWithIncompleteRecord() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
         MemoryRecords partialRecord = MemoryRecords.readableRecords(
@@ -446,8 +451,8 @@ public class FetcherTest {
 
     @Test
     public void testUnauthorizedTopic() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         // resize the limit of the buffer to pretend it is only fetch-size large
         assertEquals(1, fetcher.sendFetches());
@@ -464,13 +469,13 @@ public class FetcherTest {
     @Test
     public void testFetchDuringRebalance() {
         subscriptions.subscribe(singleton(topicName), listener);
-        subscriptions.assignFromSubscribed(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromSubscribed(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
 
         // Now the rebalance happens and fetch positions are cleared
-        subscriptions.assignFromSubscribed(singleton(tp));
+        subscriptions.assignFromSubscribed(singleton(tp1));
         client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
@@ -480,31 +485,31 @@ public class FetcherTest {
 
     @Test
     public void testInFlightFetchOnPausedPartition() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        subscriptions.pause(tp);
+        subscriptions.pause(tp1);
 
         client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
-        assertNull(fetcher.fetchedRecords().get(tp));
+        assertNull(fetcher.fetchedRecords().get(tp1));
     }
 
     @Test
     public void testFetchOnPausedPartition() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
-        subscriptions.pause(tp);
+        subscriptions.pause(tp1);
         assertFalse(fetcher.sendFetches() > 0);
         assertTrue(client.requests().isEmpty());
     }
 
     @Test
     public void testFetchNotLeaderForPartition() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
@@ -515,8 +520,8 @@ public class FetcherTest {
 
     @Test
     public void testFetchUnknownTopicOrPartition() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
@@ -527,61 +532,61 @@ public class FetcherTest {
 
     @Test
     public void testFetchOffsetOutOfRange() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertTrue(subscriptions.isOffsetResetNeeded(tp));
-        assertEquals(null, subscriptions.position(tp));
+        assertTrue(subscriptions.isOffsetResetNeeded(tp1));
+        assertEquals(null, subscriptions.position(tp1));
     }
 
     @Test
     public void testStaleOutOfRangeError() {
         // verify that an out of range error which arrives after a seek
         // does not cause us to reset our position or throw an exception
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
-        subscriptions.seek(tp, 1);
+        subscriptions.seek(tp1, 1);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertEquals(1, subscriptions.position(tp).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertEquals(1, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testFetchedRecordsAfterSeek() {
-        subscriptionsNoAutoReset.assignFromUser(singleton(tp));
-        subscriptionsNoAutoReset.seek(tp, 0);
+        subscriptionsNoAutoReset.assignFromUser(singleton(tp1));
+        subscriptionsNoAutoReset.seek(tp1, 0);
 
         assertTrue(fetcherNoAutoReset.sendFetches() > 0);
         client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
-        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
-        subscriptionsNoAutoReset.seek(tp, 2);
+        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
+        subscriptionsNoAutoReset.seek(tp1, 2);
         assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
     }
 
     @Test
     public void testFetchOffsetOutOfRangeException() {
-        subscriptionsNoAutoReset.assignFromUser(singleton(tp));
-        subscriptionsNoAutoReset.seek(tp, 0);
+        subscriptionsNoAutoReset.assignFromUser(singleton(tp1));
+        subscriptionsNoAutoReset.seek(tp1, 0);
 
         fetcherNoAutoReset.sendFetches();
         client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
 
-        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
+        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
         try {
             fetcherNoAutoReset.fetchedRecords();
             fail("Should have thrown OffsetOutOfRangeException");
         } catch (OffsetOutOfRangeException e) {
-            assertTrue(e.offsetOutOfRangePartitions().containsKey(tp));
+            assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
             assertEquals(e.offsetOutOfRangePartitions().size(), 1);
         }
         assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
@@ -589,8 +594,8 @@ public class FetcherTest {
 
     @Test
     public void testFetchDisconnected() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0), true);
@@ -598,66 +603,66 @@ public class FetcherTest {
         assertEquals(0, fetcher.fetchedRecords().size());
 
         // disconnects should have no affect on subscription state
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(0, subscriptions.position(tp).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertTrue(subscriptions.isFetchable(tp1));
+        assertEquals(0, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionToCommitted() {
         // unless a specific reset is expected, the default behavior is to reset to the committed
         // position if one is present
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.committed(tp, new OffsetAndMetadata(5));
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.committed(tp1, new OffsetAndMetadata(5));
 
-        fetcher.updateFetchPositions(singleton(tp));
-        assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, subscriptions.position(tp).longValue());
+        fetcher.updateFetchPositions(singleton(tp1));
+        assertTrue(subscriptions.isFetchable(tp1));
+        assertEquals(5, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionResetToDefaultOffset() {
-        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.assignFromUser(singleton(tp1));
         // with no commit position, we should reset using the default strategy defined above (EARLIEST)
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, subscriptions.position(tp).longValue());
+        fetcher.updateFetchPositions(singleton(tp1));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertTrue(subscriptions.isFetchable(tp1));
+        assertEquals(5, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionResetToLatestOffset() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, subscriptions.position(tp).longValue());
+        fetcher.updateFetchPositions(singleton(tp1));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertTrue(subscriptions.isFetchable(tp1));
+        assertEquals(5, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, subscriptions.position(tp).longValue());
+        fetcher.updateFetchPositions(singleton(tp1));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertTrue(subscriptions.isFetchable(tp1));
+        assertEquals(5, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionDisconnect() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
@@ -666,72 +671,72 @@ public class FetcherTest {
         // Next one succeeds
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, subscriptions.position(tp).longValue());
+        fetcher.updateFetchPositions(singleton(tp1));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertTrue(subscriptions.isFetchable(tp1));
+        assertEquals(5, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.committed(tp, new OffsetAndMetadata(0));
-        subscriptions.pause(tp); // paused partition does not have a valid position
-        subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.committed(tp1, new OffsetAndMetadata(0));
+        subscriptions.pause(tp1); // paused partition does not have a valid position
+        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 10L));
-        fetcher.updateFetchPositions(singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp1));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp));
-        assertEquals(10, subscriptions.position(tp).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp1));
+        assertEquals(10, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.pause(tp); // paused partition does not have a valid position
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.pause(tp1); // paused partition does not have a valid position
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 0L));
-        fetcher.updateFetchPositions(singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp1));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp));
-        assertEquals(0, subscriptions.position(tp).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp1));
+        assertEquals(0, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.committed(tp, new OffsetAndMetadata(0));
-        subscriptions.pause(tp); // paused partition does not have a valid position
-        subscriptions.seek(tp, 10);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.committed(tp1, new OffsetAndMetadata(0));
+        subscriptions.pause(tp1); // paused partition does not have a valid position
+        subscriptions.seek(tp1, 10);
 
-        fetcher.updateFetchPositions(singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp1));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp));
-        assertEquals(10, subscriptions.position(tp).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp1));
+        assertEquals(10, subscriptions.position(tp1).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.committed(tp, new OffsetAndMetadata(0));
-        subscriptions.seek(tp, 10);
-        subscriptions.pause(tp); // paused partition already has a valid position
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.committed(tp1, new OffsetAndMetadata(0));
+        subscriptions.seek(tp1, 10);
+        subscriptions.pause(tp1); // paused partition already has a valid position
 
-        fetcher.updateFetchPositions(singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp1));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp));
-        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp));
-        assertEquals(10, subscriptions.position(tp).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp1));
+        assertEquals(10, subscriptions.position(tp1).longValue());
     }
 
     @Test
@@ -803,8 +808,8 @@ public class FetcherTest {
      */
     @Test
     public void testQuotaMetrics() throws Exception {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
         // normal fetch
         for (int i = 1; i < 4; i++) {
@@ -813,14 +818,14 @@ public class FetcherTest {
             MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                     TimestampType.CREATE_TIME, 0L);
             for (int v = 0; v < 3; v++)
-                builder.appendWithOffset((long) i * 3 + v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
-            List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp);
+                builder.appendWithOffset(i * 3 + v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+            List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp1);
             assertEquals(3, records.size());
         }
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
-        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
+        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup));
+        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup));
         assertEquals(200, avgMetric.value(), EPSILON);
         assertEquals(300, maxMetric.value(), EPSILON);
     }
@@ -830,11 +835,11 @@ public class FetcherTest {
      */
     @Test
     public void testFetcherMetrics() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
 
-        MetricName maxLagMetric = metrics.metricName("records-lag-max", metricGroup, "");
-        MetricName partitionLagMetric = metrics.metricName(tp + ".records-lag", metricGroup, "");
+        MetricName maxLagMetric = metrics.metricName("records-lag-max", metricGroup);
+        MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -853,7 +858,7 @@ public class FetcherTest {
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                 TimestampType.CREATE_TIME, 0L);
         for (int v = 0; v < 3; v++)
-            builder.appendWithOffset((long) v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
         fetchRecords(builder.build(), Errors.NONE, 200L, 0);
         assertEquals(197, recordsFetchLagMax.value(), EPSILON);
 
@@ -862,6 +867,131 @@ public class FetcherTest {
         assertFalse(allMetrics.containsKey(partitionLagMetric));
     }
 
+    @Test
+    public void testFetchResponseMetrics() {
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+        MemoryRecords records = builder.build();
+
+        int expectedBytes = 0;
+        for (Record record : records.records())
+            expectedBytes += record.sizeInBytes();
+
+        fetchRecords(records, Errors.NONE, 100L, 0);
+        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+        assertEquals(3, recordsCountAverage.value(), EPSILON);
+    }
+
+    @Test
+    public void testFetchResponseMetricsPartialResponse() {
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 1);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+        MemoryRecords records = builder.build();
+
+        int expectedBytes = 0;
+        for (Record record : records.records()) {
+            if (record.offset() >= 1)
+                expectedBytes += record.sizeInBytes();
+        }
+
+        fetchRecords(records, Errors.NONE, 100L, 0);
+        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+        assertEquals(2, recordsCountAverage.value(), EPSILON);
+    }
+
+    @Test
+    public void testFetchResponseMetricsWithOnePartitionError() {
+        subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
+        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp2, 0);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+        MemoryRecords records = builder.build();
+
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+        partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, MemoryRecords.EMPTY));
+
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+        consumerClient.poll(0);
+        fetcher.fetchedRecords();
+
+        int expectedBytes = 0;
+        for (Record record : records.records())
+            expectedBytes += record.sizeInBytes();
+
+        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+        assertEquals(3, recordsCountAverage.value(), EPSILON);
+    }
+
+    @Test
+    public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
+        subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
+        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp2, 0);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+        // send the fetch and then seek to a new offset
+        assertEquals(1, fetcher.sendFetches());
+        subscriptions.seek(tp2, 5);
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+        MemoryRecords records = builder.build();
+
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+        partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null,
+                MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("val".getBytes()))));
+
+        client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+        consumerClient.poll(0);
+        fetcher.fetchedRecords();
+
+        // we should have ignored the record at the wrong offset
+        int expectedBytes = 0;
+        for (Record record : records.records())
+            expectedBytes += record.sizeInBytes();
+
+        assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+        assertEquals(3, recordsCountAverage.value(), EPSILON);
+    }
+
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, Errors error, long hw, int throttleTime) {
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(records, error, hw, throttleTime));
@@ -905,7 +1035,7 @@ public class FetcherTest {
                                                  Long expectedOffsetForTp0,
                                                  Long expectedOffsetForTp1) {
         client.reset();
-        TopicPartition tp0 = tp;
+        TopicPartition tp0 = tp1;
         TopicPartition tp1 = new TopicPartition(topicName, 1);
         // Ensure metadata has both partition.
         Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
@@ -944,13 +1074,13 @@ public class FetcherTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 ListOffsetRequest req = (ListOffsetRequest) body;
-                return timestamp == req.partitionTimestamps().get(tp);
+                return timestamp == req.partitionTimestamps().get(tp1);
             }
         };
     }
 
     private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
-        return listOffsetResponse(tp, error, timestamp, offset);
+        return listOffsetResponse(tp1, error, timestamp, offset);
     }
 
     private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
@@ -961,7 +1091,7 @@ public class FetcherTest {
     }
 
     private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
-        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp1,
                 new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
         return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index b02c5c9..8466c83 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -207,4 +208,17 @@ public class DefaultRecordBatchTest {
         assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key()));
     }
 
+    @Test
+    public void testStreamingIteratorConsistency() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+        try (CloseableIterator<Record> streamingIterator = batch.streamingIterator()) {
+            TestUtils.checkEquals(streamingIterator, batch.iterator());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ea857a0..1019282 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -33,6 +33,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -302,6 +303,10 @@ public class TestUtils {
         assertEquals(toList(it1), toList(it2));
     }
 
+    public static <T> void checkEquals(Iterator<T> it1, Iterator<T> it2) {
+        assertEquals(Utils.toList(it1), Utils.toList(it2));
+    }
+
     public static <T> List<T> toList(Iterable<? extends T> iterable) {
         List<T> list = new ArrayList<>();
         for (T item : iterable)