You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/31 06:04:09 UTC

[kafka] branch trunk updated: KAFKA-6927; Chunked down-conversion to prevent out of memory errors on broker [KIP-283] (#4871)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 837f31d  KAFKA-6927; Chunked down-conversion to prevent out of memory errors on broker [KIP-283] (#4871)
837f31d is described below

commit 837f31dd1850b179918f83338b4b4487486b2c58
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Wed May 30 23:03:51 2018 -0700

    KAFKA-6927; Chunked down-conversion to prevent out of memory errors on broker [KIP-283] (#4871)
    
    Implementation for lazy down-conversion in a chunked manner for efficient memory usage during down-conversion. This pull request is mainly to get initial feedback on the direction of the patch. The patch includes all the main components from KIP-283.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  11 +-
 .../apache/kafka/common/protocol/types/Struct.java |   6 +-
 .../apache/kafka/common/protocol/types/Type.java   |  16 +-
 .../kafka/common/record/AbstractRecords.java       | 110 +----------
 .../{ConvertedRecords.java => BaseRecords.java}    |  30 ++-
 .../kafka/common/record/ConvertedRecords.java      |  10 +-
 ...nvertedRecords.java => DefaultRecordsSend.java} |  21 +--
 .../apache/kafka/common/record/FileRecords.java    |  15 +-
 .../common/record/LazyDownConversionRecords.java   | 168 +++++++++++++++++
 .../record/LazyDownConversionRecordsSend.java      |  99 ++++++++++
 .../apache/kafka/common/record/MemoryRecords.java  |  10 +-
 .../MultiRecordsSend.java}                         |  43 ++++-
 ...essingStats.java => RecordConversionStats.java} |  24 ++-
 .../org/apache/kafka/common/record/Records.java    |  35 ++--
 .../common/{requests => record}/RecordsSend.java   |  36 +++-
 .../{AbstractRecords.java => RecordsUtil.java}     | 138 +-------------
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../kafka/common/requests/FetchResponse.java       |  78 ++++----
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  13 +-
 .../clients/consumer/internals/FetcherTest.java    |  64 +++----
 .../kafka/common/record/FileRecordsTest.java       | 133 ++++++++++----
 .../record/LazyDownConversionRecordsTest.java      | 203 +++++++++++++++++++++
 .../common/record/MemoryRecordsBuilderTest.java    |  12 +-
 .../MultiRecordsSendTest.java}                     |  10 +-
 core/src/main/scala/kafka/log/Log.scala            |  12 +-
 core/src/main/scala/kafka/log/LogValidator.scala   |  18 +-
 .../main/scala/kafka/network/RequestChannel.scala  |  81 +++++---
 .../main/scala/kafka/network/SocketServer.scala    |  42 +++--
 .../scala/kafka/server/ClientQuotaManager.scala    |  13 +-
 .../src/main/scala/kafka/server/FetchSession.scala |  26 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   | 169 +++++++++--------
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  19 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   6 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  11 +-
 .../main/scala/kafka/server/ThrottledChannel.scala |  15 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  21 +--
 .../AbstractCoordinatorConcurrencyTest.scala       |  10 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  30 +--
 .../unit/kafka/network/SocketServerTest.scala      |  42 +++--
 .../unit/kafka/server/ClientQuotaManagerTest.scala |  45 ++++-
 .../scala/unit/kafka/server/FetchRequestTest.scala |  62 ++++---
 .../scala/unit/kafka/server/FetchSessionTest.scala |  15 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   8 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   4 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../server/ThrottledChannelExpirationTest.scala    |  50 +++--
 .../util/ReplicaFetcherMockBlockingSend.scala      |   6 +-
 47 files changed, 1249 insertions(+), 745 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 38f324f..ca8e0d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
@@ -204,7 +205,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
-                            FetchResponse response = (FetchResponse) resp.responseBody();
+                            FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
                             FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
                             if (handler == null) {
                                 log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
@@ -218,7 +219,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                             Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                             FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
 
-                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
                                 TopicPartition partition = entry.getKey();
                                 long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
                                 FetchResponse.PartitionData fetchData = entry.getValue();
@@ -894,7 +895,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      */
     private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
         TopicPartition tp = completedFetch.partition;
-        FetchResponse.PartitionData partition = completedFetch.partitionData;
+        FetchResponse.PartitionData<Records> partition = completedFetch.partitionData;
         long fetchOffset = completedFetch.fetchedOffset;
         PartitionRecords partitionRecords = null;
         Errors error = partition.error;
@@ -1252,13 +1253,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private static class CompletedFetch {
         private final TopicPartition partition;
         private final long fetchedOffset;
-        private final FetchResponse.PartitionData partitionData;
+        private final FetchResponse.PartitionData<Records> partitionData;
         private final FetchResponseMetricAggregator metricAggregator;
         private final short responseVersion;
 
         private CompletedFetch(TopicPartition partition,
                                long fetchedOffset,
-                               FetchResponse.PartitionData partitionData,
+                               FetchResponse.PartitionData<Records> partitionData,
                                FetchResponseMetricAggregator metricAggregator,
                                short responseVersion) {
             this.partition = partition;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ac24a1b..7dccc10 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.protocol.types;
 
-import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.BaseRecords;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -172,8 +172,8 @@ public class Struct {
         return (Byte) get(name);
     }
 
-    public Records getRecords(String name) {
-        return (Records) get(name);
+    public BaseRecords getRecords(String name) {
+        return (BaseRecords) get(name);
     }
 
     public Short getShort(BoundField field) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 85916d5..4bd508b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.protocol.types;
 
-import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.BaseRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.ByteUtils;
@@ -549,14 +549,14 @@ public abstract class Type {
 
         @Override
         public void write(ByteBuffer buffer, Object o) {
-            if (o instanceof FileRecords)
-                throw new IllegalArgumentException("FileRecords must be written to the channel directly");
+            if (!(o instanceof MemoryRecords))
+                throw new IllegalArgumentException("Unexpected record type: " + o.getClass());
             MemoryRecords records = (MemoryRecords) o;
             NULLABLE_BYTES.write(buffer, records.buffer().duplicate());
         }
 
         @Override
-        public Records read(ByteBuffer buffer) {
+        public MemoryRecords read(ByteBuffer buffer) {
             ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer);
             return MemoryRecords.readableRecords(recordsBuffer);
         }
@@ -566,7 +566,7 @@ public abstract class Type {
             if (o == null)
                 return 4;
 
-            Records records = (Records) o;
+            BaseRecords records = (BaseRecords) o;
             return 4 + records.sizeInBytes();
         }
 
@@ -576,12 +576,12 @@ public abstract class Type {
         }
 
         @Override
-        public Records validate(Object item) {
+        public BaseRecords validate(Object item) {
             if (item == null)
                 return null;
 
-            if (item instanceof Records)
-                return (Records) item;
+            if (item instanceof BaseRecords)
+                return (BaseRecords) item;
 
             throw new SchemaException(item + " is not an instance of " + Records.class.getName());
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 89a5413..5e41901 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -18,13 +18,10 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 public abstract class AbstractRecords implements Records {
 
@@ -52,97 +49,6 @@ public abstract class AbstractRecords implements Records {
     }
 
     /**
-     * Down convert batches to the provided message format version. The first offset parameter is only relevant in the
-     * conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1
-     * are not batched (put another way, each batch always has 1 record).
-     *
-     * If a client requests records in v1 format starting from the middle of an uncompressed batch in v2 format, we
-     * need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for
-     * correctness.
-     *
-     * The temporaryMemoryBytes computation assumes that the batches are not loaded into the heap
-     * (via classes like FileChannelRecordBatch) before this method is called. This is the case in the broker (we
-     * only load records into the heap when down converting), but it's not for the producer. However, down converting
-     * in the producer is very uncommon and the extra complexity to handle that case is not worth it.
-     */
-    protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
-            long firstOffset, Time time) {
-        // maintain the batch along with the decompressed records to avoid the need to decompress again
-        List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
-        int totalSizeEstimate = 0;
-        long startNanos = time.nanoseconds();
-
-        for (RecordBatch batch : batches) {
-            if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
-                continue;
-
-            if (batch.magic() <= toMagic) {
-                totalSizeEstimate += batch.sizeInBytes();
-                recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
-            } else {
-                List<Record> records = new ArrayList<>();
-                for (Record record : batch) {
-                    // See the method javadoc for an explanation
-                    if (toMagic > RecordBatch.MAGIC_VALUE_V1 || batch.isCompressed() || record.offset() >= firstOffset)
-                        records.add(record);
-                }
-                if (records.isEmpty())
-                    continue;
-                final long baseOffset;
-                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >= RecordBatch.MAGIC_VALUE_V2)
-                    baseOffset = batch.baseOffset();
-                else
-                    baseOffset = records.get(0).offset();
-                totalSizeEstimate += estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
-                recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
-            }
-        }
-
-        ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
-        long temporaryMemoryBytes = 0;
-        int numRecordsConverted = 0;
-        for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
-            temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
-            if (recordBatchAndRecords.batch.magic() <= toMagic) {
-                recordBatchAndRecords.batch.writeTo(buffer);
-            } else {
-                MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
-                buffer = builder.buffer();
-                temporaryMemoryBytes += builder.uncompressedBytesWritten();
-                numRecordsConverted += builder.numRecords();
-            }
-        }
-
-        buffer.flip();
-        RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, numRecordsConverted,
-                time.nanoseconds() - startNanos);
-        return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
-    }
-
-    /**
-     * Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
-     * one (e.g. it may require expansion).
-     */
-    private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
-        RecordBatch batch = recordBatchAndRecords.batch;
-        final TimestampType timestampType = batch.timestampType();
-        long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
-
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
-                timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
-        for (Record record : recordBatchAndRecords.records) {
-            // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
-            if (magic > RecordBatch.MAGIC_VALUE_V1)
-                builder.append(record);
-            else
-                builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
-        }
-
-        builder.close();
-        return builder;
-    }
-
-    /**
      * Get an iterator over the deep records.
      * @return An iterator over the records
      */
@@ -151,6 +57,11 @@ public abstract class AbstractRecords implements Records {
         return records;
     }
 
+    @Override
+    public RecordsSend toSend(String destination) {
+        return new DefaultRecordsSend(destination, this);
+    }
+
     private Iterator<Record> recordsIterator() {
         return new AbstractIterator<Record>() {
             private final Iterator<? extends RecordBatch> batches = batches().iterator();
@@ -241,16 +152,5 @@ public abstract class AbstractRecords implements Records {
         }
     }
 
-    private static class RecordBatchAndRecords {
-        private final RecordBatch batch;
-        private final List<Record> records;
-        private final Long baseOffset;
-
-        private RecordBatchAndRecords(RecordBatch batch, List<Record> records, Long baseOffset) {
-            this.batch = batch;
-            this.records = records;
-            this.baseOffset = baseOffset;
-        }
-    }
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/BaseRecords.java
similarity index 62%
copy from clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
copy to clients/src/main/java/org/apache/kafka/common/record/BaseRecords.java
index fe37c48..3ebaf79 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/BaseRecords.java
@@ -16,21 +16,19 @@
  */
 package org.apache.kafka.common.record;
 
-public class ConvertedRecords<T extends Records> {
-
-    private final T records;
-    private final RecordsProcessingStats recordsProcessingStats;
-
-    public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
-        this.records = records;
-        this.recordsProcessingStats = recordsProcessingStats;
-    }
-
-    public T records() {
-        return records;
-    }
+/**
+ * Base interface for accessing records which could be contained in the log, or an in-memory materialization of log records.
+ */
+public interface BaseRecords {
+    /**
+     * The size of these records in bytes.
+     * @return The size in bytes of the records
+     */
+    int sizeInBytes();
 
-    public RecordsProcessingStats recordsProcessingStats() {
-        return recordsProcessingStats;
-    }
+    /**
+     * Encapsulate this {@link BaseRecords} object into {@link RecordsSend}
+     * @return Initialized {@link RecordsSend} object
+     */
+    RecordsSend toSend(String destination);
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
index fe37c48..d9150e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
@@ -19,18 +19,18 @@ package org.apache.kafka.common.record;
 public class ConvertedRecords<T extends Records> {
 
     private final T records;
-    private final RecordsProcessingStats recordsProcessingStats;
+    private final RecordConversionStats recordConversionStats;
 
-    public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
+    public ConvertedRecords(T records, RecordConversionStats recordConversionStats) {
         this.records = records;
-        this.recordsProcessingStats = recordsProcessingStats;
+        this.recordConversionStats = recordConversionStats;
     }
 
     public T records() {
         return records;
     }
 
-    public RecordsProcessingStats recordsProcessingStats() {
-        return recordsProcessingStats;
+    public RecordConversionStats recordConversionStats() {
+        return recordConversionStats;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
similarity index 58%
copy from clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
copy to clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
index fe37c48..aa715ea 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
@@ -16,21 +16,20 @@
  */
 package org.apache.kafka.common.record;
 
-public class ConvertedRecords<T extends Records> {
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
 
-    private final T records;
-    private final RecordsProcessingStats recordsProcessingStats;
-
-    public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
-        this.records = records;
-        this.recordsProcessingStats = recordsProcessingStats;
+public class DefaultRecordsSend extends RecordsSend<Records> {
+    public DefaultRecordsSend(String destination, Records records) {
+        this(destination, records, records.sizeInBytes());
     }
 
-    public T records() {
-        return records;
+    public DefaultRecordsSend(String destination, Records records, int maxBytesToWrite) {
+        super(destination, records, maxBytesToWrite);
     }
 
-    public RecordsProcessingStats recordsProcessingStats() {
-        return recordsProcessingStats;
+    @Override
+    protected long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
+        return records().writeTo(channel, previouslyWritten, remaining);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 6b6e0ab..e44d5d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
@@ -239,8 +240,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
 
     @Override
     public ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time) {
-        ConvertedRecords<MemoryRecords> convertedRecords = downConvert(batches, toMagic, firstOffset, time);
-        if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
+        ConvertedRecords<MemoryRecords> convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
+        if (convertedRecords.recordConversionStats().numRecordsConverted() == 0) {
             // This indicates that the message is too large, which means that the buffer is not large
             // enough to hold a full record batch. We just return all the bytes in this instance.
             // Even though the record batch does not have the right format version, we expect old clients
@@ -248,7 +249,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
             // are not enough available bytes in the response to read it fully. Note that this is
             // only possible prior to KIP-74, after which the broker was changed to always return at least
             // one full record batch, even if it requires exceeding the max fetch size requested by the client.
-            return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
+            return new ConvertedRecords<>(this, RecordConversionStats.EMPTY);
         } else {
             return convertedRecords;
         }
@@ -364,7 +365,12 @@ public class FileRecords extends AbstractRecords implements Closeable {
         };
     }
 
-    private Iterator<FileChannelRecordBatch> batchIterator(int start) {
+    @Override
+    public AbstractIterator<FileChannelRecordBatch> batchIterator() {
+        return batchIterator(start);
+    }
+
+    private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) {
         final int end;
         if (isSlice)
             end = this.end;
@@ -510,5 +516,4 @@ public class FileRecords extends AbstractRecords implements Closeable {
                     ')';
         }
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
new file mode 100644
index 0000000..da14b5b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Encapsulation for holding records that require down-conversion in a lazy, chunked manner (KIP-283). See
+ * {@link LazyDownConversionRecordsSend} for the actual chunked send implementation.
+ */
+public class LazyDownConversionRecords implements BaseRecords {
+    private final TopicPartition topicPartition;
+    private final Records records;
+    private final byte toMagic;
+    private final long firstOffset;
+    private ConvertedRecords firstConvertedBatch;
+    private final int sizeInBytes;
+    private final Time time;
+
+    /**
+     * @param topicPartition The topic-partition to which records belong
+     * @param records Records to lazily down-convert
+     * @param toMagic Magic version to down-convert to
+     * @param firstOffset The starting offset for down-converted records. This only impacts some cases. See
+     *                    {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
+     * @param time The time instance to use
+     */
+    public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) {
+        this.topicPartition = Objects.requireNonNull(topicPartition);
+        this.records = Objects.requireNonNull(records);
+        this.toMagic = toMagic;
+        this.firstOffset = firstOffset;
+        this.time = Objects.requireNonNull(time);
+
+        // Kafka consumers expect at least one full batch of messages for every topic-partition. To guarantee this, we
+        // need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve
+        // this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least
+        // its size.
+        AbstractIterator<? extends RecordBatch> it = records.batchIterator();
+        if (it.hasNext()) {
+            firstConvertedBatch = RecordsUtil.downConvert(Collections.singletonList(it.peek()), toMagic, firstOffset, time);
+            sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes());
+        } else {
+            firstConvertedBatch = null;
+            sizeInBytes = 0;
+        }
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return sizeInBytes;
+    }
+
+    @Override
+    public LazyDownConversionRecordsSend toSend(String destination) {
+        return new LazyDownConversionRecordsSend(destination, this);
+    }
+
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof LazyDownConversionRecords) {
+            LazyDownConversionRecords that = (LazyDownConversionRecords) o;
+            return toMagic == that.toMagic &&
+                    firstOffset == that.firstOffset &&
+                    topicPartition.equals(that.topicPartition) &&
+                    records.equals(that.records);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = toMagic;
+        result = 31 * result + (int) (firstOffset ^ (firstOffset >>> 32));
+        result = 31 * result + topicPartition.hashCode();
+        result = 31 * result + records.hashCode();
+        return result;
+    }
+
+    public java.util.Iterator<ConvertedRecords> iterator(long maximumReadSize) {
+        // We typically expect only one iterator instance to be created, so null out the first converted batch after
+        // first use to make it available for GC.
+        ConvertedRecords firstBatch = firstConvertedBatch;
+        firstConvertedBatch = null;
+        return new Iterator(records, maximumReadSize, firstBatch);
+    }
+
+    /**
+     * Implementation for being able to iterate over down-converted records. Goal of this implementation is to keep
+     * it as memory-efficient as possible by not having to maintain all down-converted records in-memory. Maintains
+     * a view into batches of down-converted records.
+     */
+    private class Iterator extends AbstractIterator<ConvertedRecords> {
+        private final AbstractIterator<? extends RecordBatch> batchIterator;
+        private final long maximumReadSize;
+        private ConvertedRecords firstConvertedBatch;
+
+        /**
+         * @param recordsToDownConvert Records that require down-conversion
+         * @param maximumReadSize Maximum possible size of underlying records that will be down-converted in each call to
+         *                        {@link #makeNext()}. This is a soft limit as {@link #makeNext()} will always convert
+         *                        and return at least one full message batch.
+         */
+        private Iterator(Records recordsToDownConvert, long maximumReadSize, ConvertedRecords firstConvertedBatch) {
+            this.batchIterator = recordsToDownConvert.batchIterator();
+            this.maximumReadSize = maximumReadSize;
+            this.firstConvertedBatch = firstConvertedBatch;
+            // If we already have the first down-converted batch, advance the underlying records iterator to next batch
+            if (firstConvertedBatch != null)
+                this.batchIterator.next();
+        }
+
+        /**
+         * Make next set of down-converted records
+         * @return Down-converted records
+         */
+        @Override
+        protected ConvertedRecords makeNext() {
+            // If we have cached the first down-converted batch, return that now
+            if (firstConvertedBatch != null) {
+                ConvertedRecords convertedBatch = firstConvertedBatch;
+                firstConvertedBatch = null;
+                return convertedBatch;
+            }
+
+            if (!batchIterator.hasNext())
+                return allDone();
+
+            // Figure out batches we should down-convert based on the size constraints
+            List<RecordBatch> batches = new ArrayList<>();
+            boolean isFirstBatch = true;
+            long sizeSoFar = 0;
+            while (batchIterator.hasNext() &&
+                    (isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <= maximumReadSize)) {
+                RecordBatch currentBatch = batchIterator.next();
+                batches.add(currentBatch);
+                sizeSoFar += currentBatch.sizeInBytes();
+                isFirstBatch = false;
+            }
+            return RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
new file mode 100644
index 0000000..b782114
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+
+/**
+ * Encapsulation for {@link RecordsSend} for {@link LazyDownConversionRecords}. Records are down-converted in batches and
+ * on-demand when {@link #writeTo} method is called.
+ */
+public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownConversionRecords> {
+    private static final Logger log = LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
+    private static final int MAX_READ_SIZE = 128 * 1024;
+
+    private RecordConversionStats recordConversionStats;
+    private RecordsSend convertedRecordsWriter;
+    private Iterator<ConvertedRecords> convertedRecordsIterator;
+
+    public LazyDownConversionRecordsSend(String destination, LazyDownConversionRecords records) {
+        super(destination, records, records.sizeInBytes());
+        convertedRecordsWriter = null;
+        recordConversionStats = new RecordConversionStats();
+        convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
+    }
+
+    @Override
+    public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
+        if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
+            MemoryRecords convertedRecords;
+
+            // Check if we have more chunks left to down-convert
+            if (convertedRecordsIterator.hasNext()) {
+                // Get next chunk of down-converted messages
+                ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
+                convertedRecords = recordsAndStats.records();
+
+                int sizeOfFirstConvertedBatch = convertedRecords.batchIterator().next().sizeInBytes();
+                if (previouslyWritten == 0 && sizeOfFirstConvertedBatch > size())
+                    throw new EOFException("Unable to send first batch completely." +
+                            " maximum_size: " + size() +
+                            " converted_records_size: " + sizeOfFirstConvertedBatch);
+
+                recordConversionStats.add(recordsAndStats.recordConversionStats());
+                log.debug("Got lazy converted records for {" + topicPartition() + "} with length=" + convertedRecords.sizeInBytes());
+            } else {
+                if (previouslyWritten == 0)
+                    throw new EOFException("Unable to get the first batch of down-converted records");
+
+                // We do not have any records left to down-convert. Construct a "fake" message for the length remaining.
+                // This message will be ignored by the consumer because its length will be past the length of maximum
+                // possible response size.
+                // DefaultRecordBatch =>
+                //      BaseOffset => Int64
+                //      Length => Int32
+                //      ...
+                // TODO: check if there is a better way to encapsulate this logic, perhaps in DefaultRecordBatch
+                log.debug("Constructing fake message batch for topic-partition {" + topicPartition() + "} for remaining length " + remaining);
+                int minLength = (Long.SIZE / Byte.SIZE) + (Integer.SIZE / Byte.SIZE);
+                ByteBuffer fakeMessageBatch = ByteBuffer.allocate(Math.max(minLength, Math.min(remaining + 1, MAX_READ_SIZE)));
+                fakeMessageBatch.putLong(-1L);
+                fakeMessageBatch.putInt(remaining + 1);
+                convertedRecords = MemoryRecords.readableRecords(fakeMessageBatch);
+            }
+
+            convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
+        }
+        return convertedRecordsWriter.writeTo(channel);
+    }
+
+    public RecordConversionStats recordConversionStats() {
+        return recordConversionStats;
+    }
+
+    public TopicPartition topicPartition() {
+        return records().topicPartition();
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index be7ea62..55a4711 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Time;
@@ -49,7 +50,7 @@ public class MemoryRecords extends AbstractRecords {
     private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
         @Override
         public Iterator<MutableRecordBatch> iterator() {
-            return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
+            return batchIterator();
         }
     };
 
@@ -115,7 +116,12 @@ public class MemoryRecords extends AbstractRecords {
 
     @Override
     public ConvertedRecords<MemoryRecords> downConvert(byte toMagic, long firstOffset, Time time) {
-        return downConvert(batches(), toMagic, firstOffset, time);
+        return RecordsUtil.downConvert(batches(), toMagic, firstOffset, time);
+    }
+
+    @Override
+    public AbstractIterator<MutableRecordBatch> batchIterator() {
+        return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
similarity index 59%
rename from clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
rename to clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
index 6b66360..2bc8d1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
@@ -14,34 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.network;
+package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.channels.GatheringByteChannel;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Queue;
 
 /**
- * A set of composite sends, sent one after another
+ * A set of composite sends with nested {@link RecordsSend}, sent one after another
  */
-public class MultiSend implements Send {
-    private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
+public class MultiRecordsSend implements Send {
+    private static final Logger log = LoggerFactory.getLogger(MultiRecordsSend.class);
 
     private final String dest;
     private final Queue<Send> sendQueue;
     private final long size;
+    private Map<TopicPartition, RecordConversionStats> recordConversionStats;
 
     private long totalWritten = 0;
     private Send current;
 
     /**
-     * Construct a MultiSend for the given destination from a queue of Send objects. The queue will be
-     * consumed as the MultiSend progresses (on completion, it will be empty).
+     * Construct a MultiRecordsSend for the given destination from a queue of Send objects. The queue will be
+     * consumed as the MultiRecordsSend progresses (on completion, it will be empty).
      */
-    public MultiSend(String dest, Queue<Send> sends) {
+    public MultiRecordsSend(String dest, Queue<Send> sends) {
         this.dest = dest;
         this.sendQueue = sends;
 
@@ -88,8 +93,10 @@ public class MultiSend implements Send {
             long written = current.writeTo(channel);
             totalWrittenPerCall += written;
             sendComplete = current.completed();
-            if (sendComplete)
+            if (sendComplete) {
+                updateRecordConversionStats(current);
                 current = sendQueue.poll();
+            }
         } while (!completed() && sendComplete);
 
         totalWritten += totalWrittenPerCall;
@@ -103,4 +110,24 @@ public class MultiSend implements Send {
         return totalWrittenPerCall;
     }
 
+    /**
+     * Get any statistics that were recorded as part of executing this {@link MultiRecordsSend}.
+     * @return Records processing statistics (could be null if no statistics were collected)
+     */
+    public Map<TopicPartition, RecordConversionStats> recordConversionStats() {
+        return recordConversionStats;
+    }
+
+    private void updateRecordConversionStats(Send completedSend) {
+        // The underlying send might have accumulated statistics that need to be recorded. For example,
+        // LazyDownConversionRecordsSend accumulates statistics related to the number of bytes down-converted, the amount
+        // of temporary memory used for down-conversion, etc. Pull out any such statistics from the underlying send
+        // and fold it up appropriately.
+        if (completedSend instanceof LazyDownConversionRecordsSend) {
+            if (recordConversionStats == null)
+                recordConversionStats = new HashMap<>();
+            LazyDownConversionRecordsSend lazyRecordsSend = (LazyDownConversionRecordsSend) completedSend;
+            recordConversionStats.put(lazyRecordsSend.topicPartition(), lazyRecordsSend.recordConversionStats());
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java b/clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java
similarity index 71%
rename from clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
rename to clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java
index e104bc8..4f0bca5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java
@@ -16,20 +16,30 @@
  */
 package org.apache.kafka.common.record;
 
-public class RecordsProcessingStats {
+public class RecordConversionStats {
 
-    public static final RecordsProcessingStats EMPTY = new RecordsProcessingStats(0L, 0, -1);
+    public static final RecordConversionStats EMPTY = new RecordConversionStats();
 
-    private final long temporaryMemoryBytes;
-    private final int numRecordsConverted;
-    private final long conversionTimeNanos;
+    private long temporaryMemoryBytes;
+    private int numRecordsConverted;
+    private long conversionTimeNanos;
 
-    public RecordsProcessingStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) {
+    public RecordConversionStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) {
         this.temporaryMemoryBytes = temporaryMemoryBytes;
         this.numRecordsConverted = numRecordsConverted;
         this.conversionTimeNanos = conversionTimeNanos;
     }
 
+    public RecordConversionStats() {
+        this(0, 0, 0);
+    }
+
+    public void add(RecordConversionStats stats) {
+        temporaryMemoryBytes += stats.temporaryMemoryBytes;
+        numRecordsConverted += stats.numRecordsConverted;
+        conversionTimeNanos += stats.conversionTimeNanos;
+    }
+
     /**
      * Returns the number of temporary memory bytes allocated to process the records.
      * This size depends on whether the records need decompression and/or conversion:
@@ -54,7 +64,7 @@ public class RecordsProcessingStats {
 
     @Override
     public String toString() {
-        return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)",
+        return String.format("RecordConversionStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)",
                 temporaryMemoryBytes, numRecordsConverted, conversionTimeNanos);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 19152ba..23607b4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Time;
+
 import java.io.IOException;
 import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
 
-import org.apache.kafka.common.utils.Time;
 
 /**
  * Interface for accessing the records contained in a log. The log itself is represented as a sequence of record
@@ -28,20 +31,19 @@ import org.apache.kafka.common.utils.Time;
  * For magic versions 1 and below, each batch consists of an 8 byte offset, a 4 byte record size, and a "shallow"
  * {@link Record record}. If the batch is not compressed, then each batch will have only the shallow record contained
  * inside it. If it is compressed, the batch contains "deep" records, which are packed into the value field of the
- * shallow record. To iterate over the shallow batches, use {@link #batches()}; for the deep records, use
- * {@link #records()}. Note that the deep iterator handles both compressed and non-compressed batches: if the batch is
- * not compressed, the shallow record is returned; otherwise, the shallow batch is decompressed and the deep records
- * are returned.
+ * shallow record. To iterate over the shallow batches, use {@link Records#batches()}; for the deep records, use
+ * {@link Records#records()}. Note that the deep iterator handles both compressed and non-compressed batches:
+ * if the batch is not compressed, the shallow record is returned; otherwise, the shallow batch is decompressed and the
+ * deep records are returned.
  *
  * For magic version 2, every batch contains 1 or more log record, regardless of compression. You can iterate
- * over the batches directly using {@link #batches()}. Records can be iterated either directly from an individual
- * batch or through {@link #records()}. Just as in previous versions, iterating over the records typically involves
+ * over the batches directly using {@link Records#batches()}. Records can be iterated either directly from an individual
+ * batch or through {@link Records#records()}. Just as in previous versions, iterating over the records typically involves
  * decompression and should therefore be used with caution.
  *
  * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
  */
-public interface Records {
-
+public interface Records extends BaseRecords {
     int OFFSET_OFFSET = 0;
     int OFFSET_LENGTH = 8;
     int SIZE_OFFSET = OFFSET_OFFSET + OFFSET_LENGTH;
@@ -55,12 +57,6 @@ public interface Records {
     int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH;
 
     /**
-     * The size of these records in bytes.
-     * @return The size in bytes of the records
-     */
-    int sizeInBytes();
-
-    /**
      * Attempts to write the contents of this buffer to a channel.
      * @param channel The channel to write to
      * @param position The position in the buffer to write from
@@ -80,6 +76,13 @@ public interface Records {
     Iterable<? extends RecordBatch> batches();
 
     /**
+     * Get an iterator over the record batches. This is similar to {@link #batches()} but returns an {@link AbstractIterator}
+     * instead of {@link Iterator}, so that clients can use methods like {@link AbstractIterator#peek() peek}.
+     * @return An iterator over the record batches of the log
+     */
+    AbstractIterator<? extends RecordBatch> batchIterator();
+
+    /**
      * Check whether all batches in this buffer have a certain magic value.
      * @param magic The magic value to check
      * @return true if all record batches have a matching magic value, false otherwise
@@ -99,7 +102,7 @@ public interface Records {
      * deep iteration since all of the deep records must also be converted to the desired format.
      * @param toMagic The magic value to convert to
      * @param firstOffset The starting offset for returned records. This only impacts some cases. See
-     *                    {@link AbstractRecords#downConvert(Iterable, byte, long, Time) for an explanation.
+     *                    {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
      * @param time instance used for reporting stats
      * @return A ConvertedRecords instance which may or may not contain the same instance in its records field.
      */
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
similarity index 55%
rename from clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
rename to clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
index 6608e9b..b40d6e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
@@ -14,29 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.requests;
+package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayers;
-import org.apache.kafka.common.record.Records;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 
-public class RecordsSend implements Send {
+public abstract class RecordsSend<T extends BaseRecords> implements Send {
     private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
 
     private final String destination;
-    private final Records records;
+    private final T records;
+    private final int maxBytesToWrite;
     private int remaining;
     private boolean pending = false;
 
-    public RecordsSend(String destination, Records records) {
+    protected RecordsSend(String destination, T records, int maxBytesToWrite) {
         this.destination = destination;
         this.records = records;
-        this.remaining = records.sizeInBytes();
+        this.maxBytesToWrite = maxBytesToWrite;
+        this.remaining = maxBytesToWrite;
     }
 
     @Override
@@ -50,11 +51,11 @@ public class RecordsSend implements Send {
     }
 
     @Override
-    public long writeTo(GatheringByteChannel channel) throws IOException {
+    public final long writeTo(GatheringByteChannel channel) throws IOException {
         long written = 0;
 
         if (remaining > 0) {
-            written = records.writeTo(channel, size() - remaining, remaining);
+            written = writeTo(channel, size() - remaining, remaining);
             if (written < 0)
                 throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
             remaining -= written;
@@ -69,6 +70,23 @@ public class RecordsSend implements Send {
 
     @Override
     public long size() {
-        return records.sizeInBytes();
+        return maxBytesToWrite;
     }
+
+    protected T records() {
+        return records;
+    }
+
+    /**
+     * Write records up to `remaining` bytes to `channel`. The implementation is allowed to be stateful. The contract
+     * from the caller is that the first invocation will be with `previouslyWritten` equal to 0, and `remaining` equal to
+     * the to maximum bytes we want to write the to `channel`. `previouslyWritten` and `remaining` will be adjusted
+     * appropriately for every subsequent invocation. See {@link #writeTo} for example expected usage.
+     * @param channel The channel to write to
+     * @param previouslyWritten Bytes written in previous calls to {@link #writeTo(GatheringByteChannel, long, int)}; 0 if being called for the first time
+     * @param remaining Number of bytes remaining to be written
+     * @return The number of bytes actually written
+     * @throws IOException For any IO errors
+     */
+    protected abstract long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException;
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
similarity index 50%
copy from clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
copy to clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
index 89a5413..c9b7394 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
@@ -16,41 +16,13 @@
  */
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-public abstract class AbstractRecords implements Records {
-
-    private final Iterable<Record> records = new Iterable<Record>() {
-        @Override
-        public Iterator<Record> iterator() {
-            return recordsIterator();
-        }
-    };
-
-    @Override
-    public boolean hasMatchingMagic(byte magic) {
-        for (RecordBatch batch : batches())
-            if (batch.magic() != magic)
-                return false;
-        return true;
-    }
-
-    @Override
-    public boolean hasCompatibleMagic(byte magic) {
-        for (RecordBatch batch : batches())
-            if (batch.magic() > magic)
-                return false;
-        return true;
-    }
-
+public class RecordsUtil {
     /**
      * Down convert batches to the provided message format version. The first offset parameter is only relevant in the
      * conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1
@@ -65,8 +37,8 @@ public abstract class AbstractRecords implements Records {
      * only load records into the heap when down converting), but it's not for the producer. However, down converting
      * in the producer is very uncommon and the extra complexity to handle that case is not worth it.
      */
-    protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
-            long firstOffset, Time time) {
+    protected static ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
+                                                                 long firstOffset, Time time) {
         // maintain the batch along with the decompressed records to avoid the need to decompress again
         List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
         int totalSizeEstimate = 0;
@@ -93,7 +65,7 @@ public abstract class AbstractRecords implements Records {
                     baseOffset = batch.baseOffset();
                 else
                     baseOffset = records.get(0).offset();
-                totalSizeEstimate += estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
+                totalSizeEstimate += AbstractRecords.estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
                 recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
             }
         }
@@ -114,7 +86,7 @@ public abstract class AbstractRecords implements Records {
         }
 
         buffer.flip();
-        RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, numRecordsConverted,
+        RecordConversionStats stats = new RecordConversionStats(temporaryMemoryBytes, numRecordsConverted,
                 time.nanoseconds() - startNanos);
         return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
     }
@@ -123,7 +95,7 @@ public abstract class AbstractRecords implements Records {
      * Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
      * one (e.g. it may require expansion).
      */
-    private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
+    private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
         RecordBatch batch = recordBatchAndRecords.batch;
         final TimestampType timestampType = batch.timestampType();
         long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
@@ -142,104 +114,6 @@ public abstract class AbstractRecords implements Records {
         return builder;
     }
 
-    /**
-     * Get an iterator over the deep records.
-     * @return An iterator over the records
-     */
-    @Override
-    public Iterable<Record> records() {
-        return records;
-    }
-
-    private Iterator<Record> recordsIterator() {
-        return new AbstractIterator<Record>() {
-            private final Iterator<? extends RecordBatch> batches = batches().iterator();
-            private Iterator<Record> records;
-
-            @Override
-            protected Record makeNext() {
-                if (records != null && records.hasNext())
-                    return records.next();
-
-                if (batches.hasNext()) {
-                    records = batches.next().iterator();
-                    return makeNext();
-                }
-
-                return allDone();
-            }
-        };
-    }
-
-    public static int estimateSizeInBytes(byte magic,
-                                          long baseOffset,
-                                          CompressionType compressionType,
-                                          Iterable<Record> records) {
-        int size = 0;
-        if (magic <= RecordBatch.MAGIC_VALUE_V1) {
-            for (Record record : records)
-                size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
-        } else {
-            size = DefaultRecordBatch.sizeInBytes(baseOffset, records);
-        }
-        return estimateCompressedSizeInBytes(size, compressionType);
-    }
-
-    public static int estimateSizeInBytes(byte magic,
-                                          CompressionType compressionType,
-                                          Iterable<SimpleRecord> records) {
-        int size = 0;
-        if (magic <= RecordBatch.MAGIC_VALUE_V1) {
-            for (SimpleRecord record : records)
-                size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
-        } else {
-            size = DefaultRecordBatch.sizeInBytes(records);
-        }
-        return estimateCompressedSizeInBytes(size, compressionType);
-    }
-
-    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
-        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
-    }
-
-    /**
-     * Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
-     * an estimate because it does not take into account overhead from the compression algorithm.
-     */
-    public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, byte[] key, byte[] value, Header[] headers) {
-        return estimateSizeInBytesUpperBound(magic, compressionType, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
-    }
-
-    /**
-     * Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
-     * an estimate because it does not take into account overhead from the compression algorithm.
-     */
-    public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key,
-                                                    ByteBuffer value, Header[] headers) {
-        if (magic >= RecordBatch.MAGIC_VALUE_V2)
-            return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, headers);
-        else if (compressionType != CompressionType.NONE)
-            return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic) + LegacyRecord.recordSize(magic, key, value);
-        else
-            return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
-    }
-
-    /**
-     * Return the size of the record batch header.
-     *
-     * For V0 and V1 with no compression, it's unclear if Records.LOG_OVERHEAD or 0 should be chosen. There is no header
-     * per batch, but a sequence of batches is preceded by the offset and size. This method returns `0` as it's what
-     * `MemoryRecordsBuilder` requires.
-     */
-    public static int recordBatchHeaderSizeInBytes(byte magic, CompressionType compressionType) {
-        if (magic > RecordBatch.MAGIC_VALUE_V1) {
-            return DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
-        } else if (compressionType != CompressionType.NONE) {
-            return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic);
-        } else {
-            return 0;
-        }
-    }
 
     private static class RecordBatchAndRecords {
         private final RecordBatch batch;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8d28521..c0ebef1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -73,7 +73,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case PRODUCE:
                 return new ProduceResponse(struct);
             case FETCH:
-                return new FetchResponse(struct);
+                return FetchResponse.parse(struct);
             case LIST_OFFSETS:
                 return new ListOffsetResponse(struct);
             case METADATA:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 103821b..16e3396 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.ByteBufferSend;
-import org.apache.kafka.common.network.MultiSend;
+import org.apache.kafka.common.record.MultiRecordsSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -26,7 +26,8 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -50,7 +51,7 @@ import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 /**
  * This wrapper supports all versions of the Fetch API
  */
-public class FetchResponse extends AbstractResponse {
+public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
 
     private static final String RESPONSES_KEY_NAME = "responses";
 
@@ -156,10 +157,10 @@ public class FetchResponse extends AbstractResponse {
     public static final Field.Int32 SESSION_ID = new Field.Int32("session_id", "The fetch session ID");
 
     private static final Schema FETCH_RESPONSE_V7 = new Schema(
-        THROTTLE_TIME_MS,
-        ERROR_CODE,
-        SESSION_ID,
-        new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            SESSION_ID,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
 
     /**
      * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
@@ -190,7 +191,7 @@ public class FetchResponse extends AbstractResponse {
     private final int throttleTimeMs;
     private final Errors error;
     private final int sessionId;
-    private final LinkedHashMap<TopicPartition, PartitionData> responseData;
+    private final LinkedHashMap<TopicPartition, PartitionData<T>> responseData;
 
     public static final class AbortedTransaction {
         public final long producerId;
@@ -226,20 +227,20 @@ public class FetchResponse extends AbstractResponse {
         }
     }
 
-    public static final class PartitionData {
+    public static final class PartitionData<T extends BaseRecords> {
         public final Errors error;
         public final long highWatermark;
         public final long lastStableOffset;
         public final long logStartOffset;
         public final List<AbortedTransaction> abortedTransactions;
-        public final Records records;
+        public final T records;
 
         public PartitionData(Errors error,
                              long highWatermark,
                              long lastStableOffset,
                              long logStartOffset,
                              List<AbortedTransaction> abortedTransactions,
-                             Records records) {
+                             T records) {
             this.error = error;
             this.highWatermark = highWatermark;
             this.lastStableOffset = lastStableOffset;
@@ -297,16 +298,18 @@ public class FetchResponse extends AbstractResponse {
      * @param throttleTimeMs    The time in milliseconds that the response was throttled
      * @param sessionId         The fetch session id.
      */
-    public FetchResponse(Errors error, LinkedHashMap<TopicPartition, PartitionData> responseData,
-                         int throttleTimeMs, int sessionId) {
+    public FetchResponse(Errors error,
+                         LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
+                         int throttleTimeMs,
+                         int sessionId) {
         this.error = error;
         this.responseData = responseData;
         this.throttleTimeMs = throttleTimeMs;
         this.sessionId = sessionId;
     }
 
-    public FetchResponse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData> responseData = new LinkedHashMap<>();
+    public static FetchResponse<MemoryRecords> parse(Struct struct) {
+        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.get(TOPIC_NAME);
@@ -323,7 +326,10 @@ public class FetchResponse extends AbstractResponse {
                 if (partitionResponseHeader.hasField(LOG_START_OFFSET_KEY_NAME))
                     logStartOffset = partitionResponseHeader.getLong(LOG_START_OFFSET_KEY_NAME);
 
-                Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
+                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
+                if (!(baseRecords instanceof MemoryRecords))
+                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
+                MemoryRecords records = (MemoryRecords) baseRecords;
 
                 List<AbortedTransaction> abortedTransactions = null;
                 if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
@@ -339,15 +345,13 @@ public class FetchResponse extends AbstractResponse {
                     }
                 }
 
-                PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset, logStartOffset,
-                    abortedTransactions, records);
+                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
+                        logStartOffset, abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
-        this.responseData = responseData;
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        this.error = Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0));
-        this.sessionId = struct.getOrElse(SESSION_ID, INVALID_SESSION_ID);
+        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
+                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
     }
 
     @Override
@@ -369,14 +373,14 @@ public class FetchResponse extends AbstractResponse {
         Queue<Send> sends = new ArrayDeque<>();
         sends.add(new ByteBufferSend(dest, buffer));
         addResponseData(responseBodyStruct, throttleTimeMs, dest, sends);
-        return new MultiSend(dest, sends);
+        return new MultiRecordsSend(dest, sends);
     }
 
     public Errors error() {
         return error;
     }
 
-    public LinkedHashMap<TopicPartition, PartitionData> responseData() {
+    public LinkedHashMap<TopicPartition, PartitionData<T>> responseData() {
         return responseData;
     }
 
@@ -397,8 +401,8 @@ public class FetchResponse extends AbstractResponse {
         return errorCounts;
     }
 
-    public static FetchResponse parse(ByteBuffer buffer, short version) {
-        return new FetchResponse(ApiKeys.FETCH.responseSchema(version).read(buffer));
+    public static FetchResponse<MemoryRecords> parse(ByteBuffer buffer, short version) {
+        return parse(ApiKeys.FETCH.responseSchema(version).read(buffer));
     }
 
     private static void addResponseData(Struct struct, int throttleTimeMs, String dest, Queue<Send> sends) {
@@ -446,7 +450,7 @@ public class FetchResponse extends AbstractResponse {
 
     private static void addPartitionData(String dest, Queue<Send> sends, Struct partitionData) {
         Struct header = partitionData.getStruct(PARTITION_HEADER_KEY_NAME);
-        Records records = partitionData.getRecords(RECORD_SET_KEY_NAME);
+        BaseRecords records = partitionData.getRecords(RECORD_SET_KEY_NAME);
 
         // include the partition header and the size of the record set
         ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + 4);
@@ -456,24 +460,25 @@ public class FetchResponse extends AbstractResponse {
         sends.add(new ByteBufferSend(dest, buffer));
 
         // finally the send for the record set itself
-        sends.add(new RecordsSend(dest, records));
+        sends.add(records.toSend(dest));
     }
 
-    private static Struct toStruct(short version, int throttleTimeMs, Errors error,
-            Iterator<Map.Entry<TopicPartition, PartitionData>> partIterator, int sessionId) {
+    private static <T extends BaseRecords> Struct toStruct(short version, int throttleTimeMs, Errors error,
+                                                       Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
+                                                       int sessionId) {
         Struct struct = new Struct(ApiKeys.FETCH.responseSchema(version));
         struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
         struct.setIfExists(ERROR_CODE, error.code());
         struct.setIfExists(SESSION_ID, sessionId);
-        List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData =
-            FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
+        List<FetchRequest.TopicAndPartitionData<PartitionData<T>>> topicsData =
+                FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
         List<Struct> topicArray = new ArrayList<>();
-        for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
+        for (FetchRequest.TopicAndPartitionData<PartitionData<T>> topicEntry: topicsData) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
             topicData.set(TOPIC_NAME, topicEntry.topic);
             List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
+            for (Map.Entry<Integer, PartitionData<T>> partitionEntry : topicEntry.partitions.entrySet()) {
+                PartitionData<T> fetchPartitionData = partitionEntry.getValue();
                 short errorCode = fetchPartitionData.error.code();
                 // If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
                 // for KafkaStorageException. In this case the client library will translate KafkaStorageException to
@@ -524,7 +529,8 @@ public class FetchResponse extends AbstractResponse {
      * @param partIterator  The partition iterator.
      * @return              The response size in bytes.
      */
-    public static int sizeOf(short version, Iterator<Map.Entry<TopicPartition, PartitionData>> partIterator) {
+    public static <T extends BaseRecords> int sizeOf(short version,
+                                                     Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
         // use arbitrary values here without affecting the result.
         return 4 + toStruct(version, 0, Errors.NONE, partIterator, INVALID_SESSION_ID).sizeOf();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ce722cf..4886c6b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -44,11 +44,10 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FetchRequest;
-import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.FetchResponse.PartitionData;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.IsolationLevel;
@@ -1709,8 +1708,8 @@ public class KafkaConsumerTest {
     }
 
 
-    private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
-        LinkedHashMap<TopicPartition, PartitionData> tpResponses = new LinkedHashMap<>();
+    private FetchResponse<MemoryRecords> fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> tpResponses = new LinkedHashMap<>();
         for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
             TopicPartition partition = fetchEntry.getKey();
             long fetchOffset = fetchEntry.getValue().offset;
@@ -1725,8 +1724,10 @@ public class KafkaConsumerTest {
                     builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
                 records = builder.build();
             }
-            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
-                    null, records));
+            tpResponses.put(partition,
+                            new FetchResponse.PartitionData(
+                                    Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+                                    0L, null, records));
         }
         return new FetchResponse(Errors.NONE, tpResponses, 0, INVALID_SESSION_ID);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 76cde8e..9164daa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -62,11 +62,11 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchRequest.PartitionData;
-import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
@@ -885,15 +885,15 @@ public class FetcherTest {
 
         Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>();
         partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+                FetchResponse.INVALID_LOG_START_OFFSET, null, records));
         partitions.put(tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
         partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, 4,
-            0L, null, nextRecords));
+                0L, null, nextRecords));
         partitions.put(tp3, new FetchResponse.PartitionData(Errors.NONE, 100L, 4,
-            0L, null, partialRecords));
+                0L, null, partialRecords));
         client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
-                                                 0, INVALID_SESSION_ID));
+                0, INVALID_SESSION_ID));
         consumerClient.poll(0);
 
         List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
@@ -948,7 +948,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
         partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
@@ -959,7 +959,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         partitions = new HashMap<>();
         partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
         consumerClient.poll(0);
         assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -1660,7 +1660,7 @@ public class FetcherTest {
 
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
-            0, INVALID_SESSION_ID));
+                0, INVALID_SESSION_ID));
         consumerClient.poll(0);
         fetcher.fetchedRecords();
 
@@ -1700,7 +1700,7 @@ public class FetcherTest {
                 MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("val".getBytes()))));
 
         client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
-            0, INVALID_SESSION_ID));
+                0, INVALID_SESSION_ID));
         consumerClient.poll(0);
         fetcher.fetchedRecords();
 
@@ -2267,9 +2267,9 @@ public class FetcherTest {
         // Fetch some records and establish an incremental fetch session.
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions1 = new LinkedHashMap<>();
         partitions1.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 2L,
-            2, 0L, null, this.records));
+                2, 0L, null, this.records));
         partitions1.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100L,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, emptyRecords));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, emptyRecords));
         FetchResponse resp1 = new FetchResponse(Errors.NONE, partitions1, 0, 123);
         client.prepareResponse(resp1);
         assertEquals(1, fetcher.sendFetches());
@@ -2308,7 +2308,7 @@ public class FetcherTest {
         // The third response contains some new records for tp0.
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions3 = new LinkedHashMap<>();
         partitions3.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100L,
-            4, 0L, null, this.nextRecords));
+                4, 0L, null, this.nextRecords));
         new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions1), 0, INVALID_SESSION_ID);
         FetchResponse resp3 = new FetchResponse(Errors.NONE, partitions3, 0, 123);
         client.prepareResponse(resp3);
@@ -2446,33 +2446,33 @@ public class FetcherTest {
         return new ListOffsetResponse(allPartitionData);
     }
 
-    private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords records,
-                                                               List<FetchResponse.AbortedTransaction> abortedTransactions,
-                                                               Errors error,
-                                                               long lastStableOffset,
-                                                               long hw,
-                                                               int throttleTime) {
-        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp0,
-                new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, abortedTransactions, records));
-        return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+    private FetchResponse<MemoryRecords> fullFetchResponseWithAbortedTransactions(MemoryRecords records,
+                                                                                  List<FetchResponse.AbortedTransaction> abortedTransactions,
+                                                                                  Errors error,
+                                                                                  long lastStableOffset,
+                                                                                  long hw,
+                                                                                  int throttleTime) {
+        Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp0,
+                new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L, abortedTransactions, records));
+        return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
     }
 
-    private FetchResponse fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
+    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
         return fullFetchResponse(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime);
     }
 
-    private FetchResponse fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
-                                        long lastStableOffset, int throttleTime) {
-        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
-                new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, null, records));
-        return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+                                            long lastStableOffset, int throttleTime) {
+        Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
+                new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L, null, records));
+        return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
     }
 
-    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+    private FetchResponse<MemoryRecords> fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
                                         long lastStableOffset, long logStartOffset, int throttleTime) {
-        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
-                new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, records));
-        return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+        Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
+                new FetchResponse.PartitionData<>(error, hw, lastStableOffset, logStartOffset, null, records));
+        return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
     }
 
     private MetadataResponse newMetadataResponse(String topic, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index fdd3ede..f8b6dd4 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
@@ -38,11 +39,11 @@ import java.util.List;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assert.assertArrayEquals;
 
 public class FileRecordsTest {
 
@@ -347,6 +348,12 @@ public class FileRecordsTest {
         Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, time).records();
         assertTrue("No message should be there", batches(messageV0).isEmpty());
         assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
+
+        // Lazy down-conversion will not return any messages for a partial input batch
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, slice, RecordBatch.MAGIC_VALUE_V0, 0, Time.SYSTEM);
+        Iterator<ConvertedRecords> it = lazyRecords.iterator(16 * 1024L);
+        assertTrue("No messages should be returned", !it.hasNext());
     }
 
     @Test
@@ -402,8 +409,7 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(MemoryRecords.readableRecords(buffer));
             fileRecords.flush();
-            Records convertedRecords = fileRecords.downConvert(toMagic, 0L, time).records();
-            verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
+            downConvertAndVerifyRecords(records, offsets, fileRecords, compressionType, toMagic, 0L, time);
 
             if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE) {
                 long firstOffset;
@@ -411,17 +417,15 @@ public class FileRecordsTest {
                     firstOffset = 11L; // v1 record
                 else
                     firstOffset = 17; // v2 record
-                Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset, time).records();
                 List<Long> filteredOffsets = new ArrayList<>(offsets);
                 List<SimpleRecord> filteredRecords = new ArrayList<>(records);
                 int index = filteredOffsets.indexOf(firstOffset) - 1;
                 filteredRecords.remove(index);
                 filteredOffsets.remove(index);
-                verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2, compressionType, toMagic);
+                downConvertAndVerifyRecords(filteredRecords, filteredOffsets, fileRecords, compressionType, toMagic, firstOffset, time);
             } else {
                 // firstOffset doesn't have any effect in this case
-                Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L, time).records();
-                verifyConvertedRecords(records, offsets, convertedRecords2, compressionType, toMagic);
+                downConvertAndVerifyRecords(records, offsets, fileRecords, compressionType, toMagic, 10L, time);
             }
         }
     }
@@ -430,40 +434,98 @@ public class FileRecordsTest {
         return Utils.utf8(buffer, buffer.remaining());
     }
 
+    private void downConvertAndVerifyRecords(List<SimpleRecord> initialRecords,
+                                             List<Long> initialOffsets,
+                                             FileRecords fileRecords,
+                                             CompressionType compressionType,
+                                             byte toMagic,
+                                             long firstOffset,
+                                             Time time) {
+        long numBatches = 0;
+        long minBatchSize = Long.MAX_VALUE;
+        long maxBatchSize = Long.MIN_VALUE;
+        for (RecordBatch batch : fileRecords.batches()) {
+            minBatchSize = Math.min(minBatchSize, batch.sizeInBytes());
+            maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes());
+            numBatches++;
+        }
+
+        // Test the normal down-conversion path
+        List<Records> convertedRecords = new ArrayList<>();
+        convertedRecords.add(fileRecords.downConvert(toMagic, firstOffset, time).records());
+        verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType, toMagic);
+        convertedRecords.clear();
+
+        // Test the lazy down-conversion path
+        List<Long> maximumReadSize = asList(16L * 1024L,
+                (long) fileRecords.sizeInBytes(),
+                (long) fileRecords.sizeInBytes() - 1,
+                (long) fileRecords.sizeInBytes() / 4,
+                maxBatchSize + 1,
+                1L);
+        for (long readSize : maximumReadSize) {
+            TopicPartition tp = new TopicPartition("topic-1", 0);
+            LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, fileRecords, toMagic, firstOffset, Time.SYSTEM);
+            Iterator<ConvertedRecords> it = lazyRecords.iterator(readSize);
+            while (it.hasNext())
+                convertedRecords.add(it.next().records());
+
+            // Check if chunking works as expected. The only way to predictably test for this is by testing the edge cases.
+            // 1. If maximum read size is greater than the size of all batches combined, we must get all down-conversion
+            //    records in exactly two batches; the first chunk is pre down-converted and returned, and the second chunk
+            //    contains the remaining batches.
+            // 2. If maximum read size is just smaller than the size of all batches combined, we must get results in two
+            //    chunks.
+            // 3. If maximum read size is less than the size of a single record, we get one batch in each chunk.
+            if (readSize >= fileRecords.sizeInBytes())
+                assertEquals(2, convertedRecords.size());
+            else if (readSize == fileRecords.sizeInBytes() - 1)
+                assertEquals(2, convertedRecords.size());
+            else if (readSize <= minBatchSize)
+                assertEquals(numBatches, convertedRecords.size());
+
+            verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType, toMagic);
+            convertedRecords.clear();
+        }
+    }
+
     private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
                                         List<Long> initialOffsets,
-                                        Records convertedRecords,
+                                        List<Records> convertedRecordsList,
                                         CompressionType compressionType,
                                         byte magicByte) {
         int i = 0;
-        for (RecordBatch batch : convertedRecords.batches()) {
-            assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
-            if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
-                assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
-            else
-                assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
-            assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
-            for (Record record : batch) {
-                assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic()));
-                assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
-                assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key()));
-                assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value()));
-                assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
-                if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
-                    assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
-                    assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
-                    assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
-                } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
-                    assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
-                    assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
-                    assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
-                } else {
-                    assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
-                    assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
-                    assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
-                    assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
+
+        for (Records convertedRecords : convertedRecordsList) {
+            for (RecordBatch batch : convertedRecords.batches()) {
+                assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
+                if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
+                    assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
+                else
+                    assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+                assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
+                for (Record record : batch) {
+                    assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic()));
+                    assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
+                    assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key()));
+                    assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value()));
+                    assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
+                    if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
+                        assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
+                        assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+                        assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                    } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
+                        assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+                        assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
+                        assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                    } else {
+                        assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+                        assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+                        assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                        assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
+                    }
+                    i += 1;
                 }
-                i += 1;
             }
         }
         assertEquals(initialOffsets.size(), i);
@@ -490,5 +552,4 @@ public class FileRecordsTest {
         }
         fileRecords.flush();
     }
-
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
new file mode 100644
index 0000000..8765603
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class LazyDownConversionRecordsTest {
+    private final CompressionType compressionType;
+    private final byte toMagic;
+    private final DownConversionTest test;
+
+    public LazyDownConversionRecordsTest(CompressionType compressionType, byte toMagic, DownConversionTest test) {
+        this.compressionType = compressionType;
+        this.toMagic = toMagic;
+        this.test = test;
+    }
+
+    enum DownConversionTest {
+        DEFAULT,
+        OVERFLOW,
+    }
+
+    @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}, test={2}")
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+        for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
+            for (DownConversionTest test : DownConversionTest.values()) {
+                values.add(new Object[]{CompressionType.NONE, toMagic, test});
+                values.add(new Object[]{CompressionType.GZIP, toMagic, test});
+            }
+        }
+        return values;
+    }
+
+    @Test
+    public void doTestConversion() throws IOException {
+        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
+
+        Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
+                            new RecordHeader("headerKey2", "headerValue2".getBytes()),
+                            new RecordHeader("headerKey3", "headerValue3".getBytes())};
+
+        List<SimpleRecord> records = asList(
+                new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
+                new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
+                new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
+                new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
+                new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
+                new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
+                new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
+                new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
+                new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
+                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
+        assertEquals("incorrect test setup", offsets.size(), records.size());
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+                TimestampType.CREATE_TIME, 0L);
+        for (int i = 0; i < 3; i++)
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                0L);
+        for (int i = 3; i < 6; i++)
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                0L);
+        for (int i = 6; i < 10; i++)
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+        builder.close();
+
+        buffer.flip();
+
+        try (FileRecords inputRecords = FileRecords.open(tempFile())) {
+            MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+            inputRecords.append(memoryRecords);
+            inputRecords.flush();
+
+            LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(new TopicPartition("test", 1),
+                    inputRecords, toMagic, 0L, Time.SYSTEM);
+            LazyDownConversionRecordsSend lazySend = lazyRecords.toSend("foo");
+            File outputFile = tempFile();
+            FileChannel channel = new RandomAccessFile(outputFile, "rw").getChannel();
+
+            // Size of lazy records is at least as much as the size of underlying records
+            assertTrue(lazyRecords.sizeInBytes() >= inputRecords.sizeInBytes());
+
+            int toWrite;
+            int written = 0;
+            List<SimpleRecord> recordsBeingConverted;
+            List<Long> offsetsOfRecords;
+            switch (test) {
+                case DEFAULT:
+                    toWrite = inputRecords.sizeInBytes();
+                    recordsBeingConverted = records;
+                    offsetsOfRecords = offsets;
+                    break;
+                case OVERFLOW:
+                    toWrite = inputRecords.sizeInBytes() * 2;
+                    recordsBeingConverted = records;
+                    offsetsOfRecords = offsets;
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
+            while (written < toWrite)
+                written += lazySend.writeTo(channel, written, toWrite - written);
+
+            FileRecords convertedRecords = FileRecords.open(outputFile, true, (int) channel.size(), false);
+            ByteBuffer convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes());
+            convertedRecords.readInto(convertedRecordsBuffer, 0);
+            MemoryRecords convertedMemoryRecords = MemoryRecords.readableRecords(convertedRecordsBuffer);
+            verifyDownConvertedRecords(recordsBeingConverted, offsetsOfRecords, convertedMemoryRecords, compressionType, toMagic);
+
+            convertedRecords.close();
+            channel.close();
+        }
+    }
+
+    private String utf8(ByteBuffer buffer) {
+        return Utils.utf8(buffer, buffer.remaining());
+    }
+
+    private void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
+                                            List<Long> initialOffsets,
+                                            MemoryRecords downConvertedRecords,
+                                            CompressionType compressionType,
+                                            byte toMagic) {
+        int i = 0;
+        for (RecordBatch batch : downConvertedRecords.batches()) {
+            assertTrue("Magic byte should be lower than or equal to " + toMagic, batch.magic() <= toMagic);
+            if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
+                assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
+            else
+                assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
+            for (Record record : batch) {
+                assertTrue("Inner record should have magic " + toMagic, record.hasMagic(batch.magic()));
+                assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
+                assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key()));
+                assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value()));
+                assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
+                if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
+                    assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
+                    assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+                    assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
+                    assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+                    assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
+                    assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                } else {
+                    assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+                    assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+                    assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                    assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
+                }
+                i += 1;
+            }
+        }
+        assertEquals(initialOffsets.size(), i);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index a90fb29..36b14a2 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -472,7 +472,7 @@ public class MemoryRecordsBuilderTest {
         MemoryRecords records = convertedRecords.records();
 
         // Transactional markers are skipped when down converting to V1, so exclude them from size
-        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(),
+        verifyRecordsProcessingStats(convertedRecords.recordConversionStats(),
                 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers);
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
@@ -513,7 +513,7 @@ public class MemoryRecordsBuilderTest {
         ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
                 .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
         MemoryRecords records = convertedRecords.records();
-        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2,
+        verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 2,
                 records.sizeInBytes(), buffer.limit());
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
@@ -553,7 +553,7 @@ public class MemoryRecordsBuilderTest {
             assertEquals("1", utf8(logRecords.get(0).key()));
             assertEquals("2", utf8(logRecords.get(1).key()));
             assertEquals("3", utf8(logRecords.get(2).key()));
-            verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2,
+            verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 2,
                     records.sizeInBytes(), buffer.limit());
         } else {
             assertEquals(2, batches.size());
@@ -563,7 +563,7 @@ public class MemoryRecordsBuilderTest {
             assertEquals(2, batches.get(1).baseOffset());
             assertEquals("1", utf8(logRecords.get(0).key()));
             assertEquals("3", utf8(logRecords.get(1).key()));
-            verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+            verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 1,
                     records.sizeInBytes(), buffer.limit());
         }
     }
@@ -678,8 +678,8 @@ public class MemoryRecordsBuilderTest {
         assertTrue("Memory usage too high: " + memUsed, iterations < 100);
     }
 
-    private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int numRecords,
-            int numRecordsConverted, long finalBytes, long preConvertedBytes) {
+    private void verifyRecordsProcessingStats(RecordConversionStats processingStats, int numRecords,
+                                              int numRecordsConverted, long finalBytes, long preConvertedBytes) {
         assertNotNull("Records processing info is null", processingStats);
         assertEquals(numRecordsConverted, processingStats.numRecordsConverted());
         // Since nanoTime accuracy on build machines may not be sufficient to measure small conversion times,
diff --git a/clients/src/test/java/org/apache/kafka/common/network/MultiSendTest.java b/clients/src/test/java/org/apache/kafka/common/record/MultiRecordsSendTest.java
similarity index 88%
rename from clients/src/test/java/org/apache/kafka/common/network/MultiSendTest.java
rename to clients/src/test/java/org/apache/kafka/common/record/MultiRecordsSendTest.java
index d2b2ef6..38d381c7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/MultiSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MultiRecordsSendTest.java
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.network;
+package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import java.util.Queue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class MultiSendTest {
+public class MultiRecordsSendTest {
 
     @Test
     public void testSendsFreedAfterWriting() throws IOException {
@@ -45,7 +47,7 @@ public class MultiSendTest {
             sends.add(new ByteBufferSend(dest, buffer));
         }
 
-        MultiSend send = new MultiSend(dest, sends);
+        MultiRecordsSend send = new MultiRecordsSend(dest, sends);
         assertEquals(totalSize, send.size());
 
         for (int i = 0; i < numChunks; i++) {
@@ -69,7 +71,7 @@ public class MultiSendTest {
         @Override
         public long write(ByteBuffer[] srcs) throws IOException {
             // Instead of overflowing, this channel refuses additional writes once the buffer is full,
-            // which allows us to test the MultiSend behavior on a per-send basis.
+            // which allows us to test the MultiRecordsSend behavior on a per-send basis.
             if (!buffer().hasRemaining())
                 return 0;
             return super.write(srcs);
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 118288b..777dbb5 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -48,11 +48,11 @@ import java.util.regex.Pattern
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
-    RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+    RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
     LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
-      RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
@@ -65,7 +65,7 @@ object LogAppendInfo {
  * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
  * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
  * @param logStartOffset The start offset of the log at the time of this append.
- * @param recordsProcessingStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
+ * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
  * @param sourceCodec The source codec used in the message set (send by the producer)
  * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
  * @param shallowCount The number of shallow messages
@@ -78,7 +78,7 @@ case class LogAppendInfo(var firstOffset: Option[Long],
                          var offsetOfMaxTimestamp: Long,
                          var logAppendTime: Long,
                          var logStartOffset: Long,
-                         var recordsProcessingStats: RecordsProcessingStats,
+                         var recordConversionStats: RecordConversionStats,
                          sourceCodec: CompressionCodec,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
@@ -693,7 +693,7 @@ class Log(@volatile var dir: File,
           appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
           appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
           appendInfo.lastOffset = offset.value - 1
-          appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats
+          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
             appendInfo.logAppendTime = now
 
@@ -939,7 +939,7 @@ class Log(@volatile var dir: File,
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
     LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
-      RecordsProcessingStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
   private def updateProducers(batch: RecordBatch,
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 6515260..2cfbf7d 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -22,7 +22,7 @@ import kafka.common.LongRef
 import kafka.message.{CompressionCodec, NoCompressionCodec}
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.mutable
@@ -155,14 +155,14 @@ private[kafka] object LogValidator extends Logging {
     val convertedRecords = builder.build()
 
     val info = builder.info
-    val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
+    val recordConversionStats = new RecordConversionStats(builder.uncompressedBytesWritten,
       builder.numRecords, time.nanoseconds - startNanos)
     ValidationAndOffsetAssignResult(
       validatedRecords = convertedRecords,
       maxTimestamp = info.maxTimestamp,
       shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
       messageSizeMaybeChanged = true,
-      recordsProcessingStats = recordsProcessingStats)
+      recordConversionStats = recordConversionStats)
   }
 
   private def assignOffsetsNonCompressed(records: MemoryRecords,
@@ -224,7 +224,7 @@ private[kafka] object LogValidator extends Logging {
       maxTimestamp = maxTimestamp,
       shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
       messageSizeMaybeChanged = false,
-      recordsProcessingStats = RecordsProcessingStats.EMPTY)
+      recordConversionStats = RecordConversionStats.EMPTY)
   }
 
   /**
@@ -315,12 +315,12 @@ private[kafka] object LogValidator extends Logging {
         if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
           batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
 
-        val recordsProcessingStats = new RecordsProcessingStats(uncompressedSizeInBytes, 0, -1)
+        val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
         ValidationAndOffsetAssignResult(validatedRecords = records,
           maxTimestamp = maxTimestamp,
           shallowOffsetOfMaxTimestamp = lastOffset,
           messageSizeMaybeChanged = false,
-          recordsProcessingStats = recordsProcessingStats)
+          recordConversionStats = recordConversionStats)
       }
   }
 
@@ -358,7 +358,7 @@ private[kafka] object LogValidator extends Logging {
     // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
     // to rebuild the records (including recompression if enabled).
     val conversionCount = builder.numRecords
-    val recordsProcessingStats = new RecordsProcessingStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
+    val recordConversionStats = new RecordConversionStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
       conversionCount, time.nanoseconds - startNanos)
 
     ValidationAndOffsetAssignResult(
@@ -366,7 +366,7 @@ private[kafka] object LogValidator extends Logging {
       maxTimestamp = info.maxTimestamp,
       shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
       messageSizeMaybeChanged = true,
-      recordsProcessingStats = recordsProcessingStats)
+      recordConversionStats = recordConversionStats)
   }
 
   private def validateKey(record: Record, compactedTopic: Boolean) {
@@ -397,6 +397,6 @@ private[kafka] object LogValidator extends Logging {
                                              maxTimestamp: Long,
                                              shallowOffsetOfMaxTimestamp: Long,
                                              messageSizeMaybeChanged: Boolean,
-                                             recordsProcessingStats: RecordsProcessingStats)
+                                             recordConversionStats: RecordConversionStats)
 
 }
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index f45e0ce..eecce1d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -25,9 +25,11 @@ import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{Logging, NotNothing}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.RecordConversionStats
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
@@ -181,12 +183,8 @@ object RequestChannel extends Logging {
 
       if (isRequestLoggingEnabled) {
         val detailsEnabled = requestLogger.underlying.isTraceEnabled
-        val responseString =
-          if (response.responseSend.isDefined)
-            response.responseAsString.getOrElse(
-              throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
-          else ""
-
+        val responseString = response.responseString.getOrElse(
+          throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
         val builder = new StringBuilder(256)
         builder.append("Completed request:").append(requestDesc(detailsEnabled))
           .append(",response:").append(responseString)
@@ -225,24 +223,55 @@ object RequestChannel extends Logging {
 
   }
 
-  /** responseAsString should only be defined if request logging is enabled */
-  class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction,
-                 val responseAsString: Option[String]) {
-    request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
-    if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
+  abstract class Response(val request: Request) {
+    locally {
+      val nowNs = Time.SYSTEM.nanoseconds
+      request.responseCompleteTimeNanos = nowNs
+      if (request.apiLocalCompleteTimeNanos == -1L)
+        request.apiLocalCompleteTimeNanos = nowNs
+    }
 
     def processor: Int = request.processor
 
-    override def toString =
-      s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction, responseAsString=$responseAsString)"
+    def responseString: Option[String] = Some("")
+
+    def onComplete: Option[Send => Unit] = None
+
+    override def toString: String
   }
 
-  sealed trait ResponseAction
-  case object SendAction extends ResponseAction
-  case object NoOpAction extends ResponseAction
-  case object CloseConnectionAction extends ResponseAction
-  case object StartThrottlingAction extends ResponseAction
-  case object EndThrottlingAction extends ResponseAction
+  /** responseAsString should only be defined if request logging is enabled */
+  class SendResponse(request: Request,
+                     val responseSend: Send,
+                     val responseAsString: Option[String],
+                     val onCompleteCallback: Option[Send => Unit]) extends Response(request) {
+    override def responseString: Option[String] = responseAsString
+
+    override def onComplete: Option[Send => Unit] = onCompleteCallback
+
+    override def toString: String =
+      s"Response(type=Send, request=$request, send=$responseSend, asString=$responseAsString)"
+  }
+
+  class NoOpResponse(request: Request) extends Response(request) {
+    override def toString: String =
+      s"Response(type=NoOp, request=$request)"
+  }
+
+  class CloseConnectionResponse(request: Request) extends Response(request) {
+    override def toString: String =
+      s"Response(type=CloseConnection, request=$request)"
+  }
+
+  class StartThrottlingResponse(request: Request) extends Response(request) {
+    override def toString: String =
+      s"Response(type=StartThrottling, request=$request)"
+  }
+
+  class EndThrottlingResponse(request: Request) extends Response(request) {
+    override def toString: String =
+      s"Response(type=EndThrottling, request=$request)"
+  }
 }
 
 class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
@@ -287,16 +316,16 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
   def sendResponse(response: RequestChannel.Response) {
     if (isTraceEnabled) {
       val requestHeader = response.request.header
-      val message = response.responseAction match {
-        case SendAction =>
-          s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${response.responseSend.get.size} bytes."
-        case NoOpAction =>
+      val message = response match {
+        case sendResponse: SendResponse =>
+          s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes."
+        case _: NoOpResponse =>
           s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
-        case CloseConnectionAction =>
+        case _: CloseConnectionResponse =>
           s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
-        case StartThrottlingAction =>
+        case _: StartThrottlingResponse =>
           s"Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
-        case EndThrottlingAction =>
+        case _: EndThrottlingResponse =>
           s"Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
       }
       trace(message)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 759396d..db5eda6 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -28,6 +28,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.cluster.{BrokerEndPoint, EndPoint}
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
+import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
@@ -37,6 +38,7 @@ import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Meter
 import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
+import org.apache.kafka.common.record.MultiRecordsSend
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
@@ -617,36 +619,37 @@ private[kafka] class Processor(val id: Int,
   }
 
   private def processNewResponses() {
-    var curr: RequestChannel.Response = null
-    while ({curr = dequeueResponse(); curr != null}) {
-      val channelId = curr.request.context.connectionId
+    var currentResponse: RequestChannel.Response = null
+    while ({currentResponse = dequeueResponse(); currentResponse != null}) {
+      val channelId = currentResponse.request.context.connectionId
       try {
-        curr.responseAction match {
-          case RequestChannel.NoOpAction =>
+        currentResponse match {
+          case response: NoOpResponse =>
             // There is no response to send to the client, we need to read more pipelined requests
             // that are sitting in the server's socket buffer
-            updateRequestMetrics(curr)
-            trace("Socket server received empty response to send, registering for read: " + curr)
+            updateRequestMetrics(response)
+            trace("Socket server received empty response to send, registering for read: " + response)
             // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
             // it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the
             // throttling delay has already passed by now.
             handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
             tryUnmuteChannel(channelId)
-          case RequestChannel.SendAction =>
-            val responseSend = curr.responseSend.getOrElse(
-              throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
-            sendResponse(curr, responseSend)
-          case RequestChannel.CloseConnectionAction =>
-            updateRequestMetrics(curr)
+
+          case response: SendResponse =>
+            sendResponse(response, response.responseSend)
+          case response: CloseConnectionResponse =>
+            updateRequestMetrics(response)
             trace("Closing socket connection actively according to the response code.")
             close(channelId)
-          case RequestChannel.StartThrottlingAction =>
+          case response: StartThrottlingResponse =>
             handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
-          case RequestChannel.EndThrottlingAction =>
+          case response: EndThrottlingResponse =>
             // Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
             // the client.
             handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
             tryUnmuteChannel(channelId)
+          case _ =>
+            throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
         }
       } catch {
         case e: Throwable =>
@@ -713,10 +716,13 @@ private[kafka] class Processor(val id: Int,
   private def processCompletedSends() {
     selector.completedSends.asScala.foreach { send =>
       try {
-        val resp = inflightResponses.remove(send.destination).getOrElse {
+        val response = inflightResponses.remove(send.destination).getOrElse {
           throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
         }
-        updateRequestMetrics(resp)
+        updateRequestMetrics(response)
+
+        // Invoke send completion callback
+        response.onComplete.foreach(onComplete => onComplete(send))
 
         // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
         // it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
@@ -730,7 +736,7 @@ private[kafka] class Processor(val id: Int,
     }
   }
 
-  private def updateRequestMetrics(response: RequestChannel.Response) {
+  private def updateRequestMetrics(response: RequestChannel.Response): Unit = {
     val request = response.request
     val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
     request.updateRequestMetrics(networkThreadTimeNanos, response)
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 73b40d1..41ee420 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -269,18 +269,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     * @param channelThrottlingCallback Callback for channel throttling
     * @return ThrottledChannel object
     */
-  def throttle(request: RequestChannel.Request, throttleTimeMs: Int,
-               channelThrottlingCallback: (ResponseAction) => Unit) {
-    throttle(request.session, request.header.clientId, throttleTimeMs, channelThrottlingCallback)
-  }
-
-  def throttle(session: Session, clientId: String, throttleTimeMs: Int,
-               channelThrottlingCallback: (ResponseAction) => Unit) {
+  def throttle(request: RequestChannel.Request, throttleTimeMs: Int, channelThrottlingCallback: Response => Unit): Unit = {
     if (throttleTimeMs > 0) {
-      val clientSensors = getOrCreateQuotaSensors(session, clientId)
-
+      val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
       clientSensors.throttleTimeSensor.record(throttleTimeMs)
-      val throttledChannel = new ThrottledChannel(time, throttleTimeMs, channelThrottlingCallback)
+      val throttledChannel = new ThrottledChannel(request, time, throttleTimeMs, channelThrottlingCallback)
       delayQueue.add(throttledChannel)
       delayQueueSensor.record()
       debug("Channel throttled for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 3810d90..7a47780 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -25,9 +25,9 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INITIAL_EPOCH, INVALID_SESSION_ID}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.utils.{ImplicitLinkedHashSet, Time, Utils}
 
 import scala.math.Ordered.orderingToOrdered
@@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
 
 object FetchSession {
   type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
-  type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+  type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
   type CACHE_MAP = ImplicitLinkedHashSet[CachedPartition]
-  type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData]]
+  type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]]
 
   val NUM_INCREMENTAL_FETCH_SESSISONS = "NumIncrementalFetchSessions"
   val NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED = "NumIncrementalFetchPartitionsCached"
@@ -100,7 +100,7 @@ class CachedPartition(val topic: String,
       reqData.logStartOffset, -1)
 
   def this(part: TopicPartition, reqData: FetchRequest.PartitionData,
-           respData: FetchResponse.PartitionData) =
+           respData: FetchResponse.PartitionData[Records]) =
     this(part.topic(), part.partition(),
       reqData.maxBytes, reqData.fetchOffset, respData.highWatermark,
       reqData.logStartOffset, respData.logStartOffset)
@@ -126,7 +126,7 @@ class CachedPartition(val topic: String,
     * @param updateResponseData if set to true, update this CachedPartition with new request and response data.
     * @return True if this partition should be included in the response; false if it can be omitted.
     */
-  def maybeUpdateResponseData(respData: FetchResponse.PartitionData, updateResponseData: Boolean): Boolean = {
+  def maybeUpdateResponseData(respData: FetchResponse.PartitionData[Records], updateResponseData: Boolean): Boolean = {
     // Check the response data.
     var mustRespond = false
     if ((respData.records != null) && (respData.records.sizeInBytes() > 0)) {
@@ -286,7 +286,7 @@ trait FetchContext extends Logging {
     * Updates the fetch context with new partition information.  Generates response data.
     * The response data may require subsequent down-conversion.
     */
-  def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse
+  def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records]
 
   def partitionsToLogString(partitions: util.Collection[TopicPartition]): String =
     FetchSession.partitionsToLogString(partitions, isTraceEnabled)
@@ -306,7 +306,7 @@ class SessionErrorContext(val error: Errors,
   }
 
   // Because of the fetch session error, we don't know what partitions were supposed to be in this request.
-  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
     debug(s"Session error fetch context returning $error")
     new FetchResponse(error, new FetchSession.RESP_MAP, 0, INVALID_SESSION_ID)
   }
@@ -329,7 +329,7 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque
     FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
   }
 
-  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
     debug(s"Sessionless fetch context returning ${partitionsToLogString(updates.keySet())}")
     new FetchResponse(Errors.NONE, updates, 0, INVALID_SESSION_ID)
   }
@@ -360,7 +360,7 @@ class FullFetchContext(private val time: Time,
     FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
   }
 
-  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
     def createNewSession(): FetchSession.CACHE_MAP = {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size())
       updates.entrySet().asScala.foreach(entry => {
@@ -407,7 +407,7 @@ class IncrementalFetchContext(private val time: Time,
   private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
                                   val updateFetchContextAndRemoveUnselected: Boolean)
     extends FetchSession.RESP_MAP_ITER {
-    var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = null
+    var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null
 
     override def hasNext: Boolean = {
       while ((nextElement == null) && iter.hasNext()) {
@@ -431,7 +431,7 @@ class IncrementalFetchContext(private val time: Time,
       nextElement != null
     }
 
-    override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = {
+    override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = {
       if (!hasNext()) throw new NoSuchElementException()
       val element = nextElement
       nextElement = null
@@ -453,7 +453,7 @@ class IncrementalFetchContext(private val time: Time,
     }
   }
 
-  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+  override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
     session.synchronized {
       // Check to make sure that the session epoch didn't change in between
       // creating this fetch context and generating this response.
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index abca2f0..dcdfae0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,51 +17,49 @@
 
 package kafka.server
 
-import java.nio.ByteBuffer
 import java.lang.{Long => JLong}
-import java.util.{Collections, Properties}
+import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.{Collections, Properties}
 
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, OffsetMetadata}
-import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.RequestChannel
-import kafka.network.RequestChannel._
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
+import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
+import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, MultiRecordsSend, RecordBatch, RecordConversionStats, Records}
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
-import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshakeResponse}
-import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
 import org.apache.kafka.common.resource.{Resource => AdminResource}
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
-import DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{Node, TopicPartition}
 
-import scala.collection._
 import scala.collection.JavaConverters._
+import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
@@ -87,6 +85,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 time: Time,
                 val tokenManager: DelegationTokenManager) extends Logging {
 
+  type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
   val adminZkClient = new AdminZkClient(zkClient)
 
@@ -410,7 +409,6 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
-
       val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
       var errorInResponse = false
 
@@ -436,9 +434,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
       if (maxThrottleTimeMs > 0) {
         if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
-          quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendActionOnlyResponse(request))
+          quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse)
         } else {
-          quotas.request.throttle(request, requestThrottleTimeMs, sendActionOnlyResponse(request))
+          quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
         }
       }
 
@@ -463,13 +461,13 @@ class KafkaApis(val requestChannel: RequestChannel,
           sendNoOpResponseExemptThrottle(request)
         }
       } else {
-        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)))
+        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
       }
     }
 
-    def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {
+    def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
       processingStats.foreach { case (tp, info) =>
-        updateRecordsProcessingStats(request, tp, info)
+        updateRecordConversionStats(request, tp, info)
       }
     }
 
@@ -486,7 +484,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         isFromClient = true,
         entriesPerPartition = authorizedRequestInfo,
         responseCallback = sendResponseCallback,
-        processingStatsCallback = processingStatsCallback)
+        recordConversionStatsCallback = processingStatsCallback)
 
       // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
       // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
@@ -506,7 +504,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchRequest.toForget(),
           fetchRequest.isFromFollower())
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
+    val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]()
     val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower()) {
       // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
@@ -543,7 +541,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       })
     }
 
-    def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
+    def convertRecords(tp: TopicPartition, unconvertedRecords: Records): BaseRecords = {
       // Down-conversion of the fetched records is needed when the stored magic version is
       // greater than that supported by the client (as indicated by the fetch request version). If the
       // configured magic version for the topic is less than or equal to that supported by the version of the
@@ -553,9 +551,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       // which were written in the new format prior to the version downgrade.
       replicaManager.getMagic(tp).flatMap { magic =>
         val downConvertMagic = {
-          if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+          if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
             Some(RecordBatch.MAGIC_VALUE_V0)
-          else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
+          else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
             Some(RecordBatch.MAGIC_VALUE_V1)
           else
             None
@@ -563,18 +561,19 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         downConvertMagic.map { magic =>
           trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
-          val converted = data.records.downConvert(magic, fetchContext.getFetchOffset(tp).get, time)
-          updateRecordsProcessingStats(request, tp, converted.recordsProcessingStats)
-          new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            data.logStartOffset, data.abortedTransactions, converted.records)
-        }
 
-      }.getOrElse(data)
+          // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
+          // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
+          // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
+          // client.
+          new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
+        }
+      }.getOrElse(unconvertedRecords)
     }
 
     // the callback for process a fetch response, invoked before throttling
-    def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
-      val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+    def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
+      val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
       responsePartitionData.foreach{ case (tp, data) =>
         val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
         val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
@@ -587,16 +586,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       // Record time before any byte-rate throttling.
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
-      var unconvertedFetchResponse: FetchResponse = null
+      var unconvertedFetchResponse: FetchResponse[Records] = null
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
-        unconvertedFetchResponse.responseData().asScala.foreach { case (tp, partitionData) =>
-          if (partitionData.error != Errors.NONE)
+      def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = {
+        // Down-convert messages for each partition if required
+        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]]
+        unconvertedFetchResponse.responseData().asScala.foreach { case (tp, unconvertedPartitionData) =>
+          if (unconvertedPartitionData.error != Errors.NONE)
             debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${partitionData.error.exceptionName}")
-          convertedData.put(tp, convertedPartitionData(tp, partitionData))
+              s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
+          val convertedRecords = convertRecords(tp, unconvertedPartitionData.records)
+          val convertedPartitionData = new FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error,
+            unconvertedPartitionData.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedPartitionData.logStartOffset,
+            unconvertedPartitionData.abortedTransactions, convertedRecords)
+          convertedData.put(tp, convertedPartitionData)
         }
+
+        // Prepare fetch resopnse from converted data
         val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs,
           unconvertedFetchResponse.sessionId())
         response.responseData.asScala.foreach { case (topicPartition, data) =>
@@ -606,6 +612,16 @@ class KafkaApis(val requestChannel: RequestChannel,
         response
       }
 
+      def updateConversionStats(send: Send): Unit = {
+        send match {
+          case send: MultiRecordsSend if send.recordConversionStats != null =>
+            send.recordConversionStats.asScala.toMap.foreach {
+              case (tp, stats) => updateRecordConversionStats(request, tp, stats)
+            }
+          case _ =>
+        }
+      }
+
       if (fetchRequest.isFromFollower) {
         // We've already evaluated against the quota and are good to go. Just need to record it now.
         unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
@@ -613,7 +629,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         quotas.leader.record(responseSize)
         trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData().size()}, " +
           s"metadata=${unconvertedFetchResponse.sessionId()}")
-        sendResponseExemptThrottle(request, createResponse(0))
+        sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
       } else {
         // Fetch size used to determine throttle time is calculated before any down conversions.
         // This may be slightly different from the actual response size. But since down conversions
@@ -633,22 +649,21 @@ class KafkaApis(val requestChannel: RequestChannel,
           // from the fetch quota because we are going to return an empty response.
           quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
           if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
-            quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendActionOnlyResponse(request))
+            quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendResponse)
           } else {
-            quotas.request.throttle(request, requestThrottleTimeMs, sendActionOnlyResponse(request))
+            quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
           }
           // If throttling is required, return an empty response.
-          unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData](),
-            maxThrottleTimeMs, INVALID_SESSION_ID)
+          unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition,
+            FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, INVALID_SESSION_ID)
         } else {
           // Get the actual response. This will update the fetch context.
           unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
-          trace(s"Sending Fetch response with partitions.size=${responseSize}, " +
-            s"metadata=${unconvertedFetchResponse.sessionId()}")
+          trace(s"Sending Fetch response with partitions.size=${responseSize}, metadata=${unconvertedFetchResponse.sessionId()}")
         }
 
         // Send the response immediately.
-        sendResponse(request, Some(createResponse(maxThrottleTimeMs)))
+        sendResponse(request, Some(createResponse(maxThrottleTimeMs)), Some(updateConversionStats))
       }
     }
 
@@ -669,12 +684,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData],
+  class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]],
                           val quota: ReplicationQuotaManager)
-                          extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData]] {
+                          extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]] {
     val iter = partitions.entrySet().iterator()
 
-    var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = null
+    var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null
 
     override def hasNext: Boolean = {
       while ((nextElement == null) && iter.hasNext()) {
@@ -686,7 +701,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       nextElement != null
     }
 
-    override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = {
+    override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = {
       if (!hasNext()) throw new NoSuchElementException()
       val element = nextElement
       nextElement = null
@@ -699,7 +714,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
   // traffic doesn't exceed quota.
   private def sizeOfThrottledPartitions(versionId: Short,
-                                        unconvertedResponse: FetchResponse,
+                                        unconvertedResponse: FetchResponse[Records],
                                         quota: ReplicationQuotaManager): Int = {
     val iter = new SelectingIterator(unconvertedResponse.responseData(), quota)
     FetchResponse.sizeOf(versionId, iter)
@@ -2227,9 +2242,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
 
-  private def updateRecordsProcessingStats(request: RequestChannel.Request, tp: TopicPartition,
-                                           processingStats: RecordsProcessingStats): Unit = {
-    val conversionCount = processingStats.numRecordsConverted
+  private def updateRecordConversionStats(request: RequestChannel.Request,
+                                          tp: TopicPartition,
+                                          conversionStats: RecordConversionStats): Unit = {
+    val conversionCount = conversionStats.numRecordsConverted
     if (conversionCount > 0) {
       request.header.apiKey match {
         case ApiKeys.PRODUCE =>
@@ -2241,9 +2257,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         case _ =>
           throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
       }
-      request.messageConversionsTimeNanos = processingStats.conversionTimeNanos
+      request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
     }
-    request.temporaryMemoryBytes = processingStats.temporaryMemoryBytes
+    request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
   }
 
   private def handleError(request: RequestChannel.Request, e: Throwable) {
@@ -2257,21 +2273,25 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   // Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
   // response immediately.
-  private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse): Unit = {
+  private def sendResponseMaybeThrottle(request: RequestChannel.Request,
+                                        createResponse: Int => AbstractResponse,
+                                        onComplete: Option[Send => Unit] = None): Unit = {
     val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
-    quotas.request.throttle(request, throttleTimeMs, sendActionOnlyResponse(request))
-    sendResponse(request, Some(createResponse(throttleTimeMs)))
+    quotas.request.throttle(request, throttleTimeMs, sendResponse)
+    sendResponse(request, Some(createResponse(throttleTimeMs)), onComplete)
   }
 
   private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable) {
     val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
-    quotas.request.throttle(request, throttleTimeMs, sendActionOnlyResponse(request))
+    quotas.request.throttle(request, throttleTimeMs, sendResponse)
     sendErrorOrCloseConnection(request, error, throttleTimeMs)
   }
 
-  private def sendResponseExemptThrottle(request: RequestChannel.Request, response: AbstractResponse): Unit = {
+  private def sendResponseExemptThrottle(request: RequestChannel.Request,
+                                         response: AbstractResponse,
+                                         onComplete: Option[Send => Unit] = None): Unit = {
     quotas.request.maybeRecordExempt(request)
-    sendResponse(request, Some(response))
+    sendResponse(request, Some(response), onComplete)
   }
 
   private def sendErrorResponseExemptThrottle(request: RequestChannel.Request, error: Throwable): Unit = {
@@ -2285,38 +2305,41 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (response == null)
       closeConnection(request, requestBody.errorCounts(error))
     else
-      sendResponse(request, Some(response))
+      sendResponse(request, Some(response), None)
   }
 
   private def sendNoOpResponseExemptThrottle(request: RequestChannel.Request): Unit = {
     quotas.request.maybeRecordExempt(request)
-    sendResponse(request, None)
+    sendResponse(request, None, None)
   }
 
   private def closeConnection(request: RequestChannel.Request, errorCounts: java.util.Map[Errors, Integer]): Unit = {
     // This case is used when the request handler has encountered an error, but the client
     // does not expect a response (e.g. when produce request has acks set to 0)
     requestChannel.updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
-    sendActionOnlyResponse(request)(CloseConnectionAction)
+    requestChannel.sendResponse(new RequestChannel.CloseConnectionResponse(request))
   }
 
-  private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
+  private def sendResponse(request: RequestChannel.Request,
+                           responseOpt: Option[AbstractResponse],
+                           onComplete: Option[Send => Unit]): Unit = {
     // Update error metrics for each error code in the response including Errors.NONE
     responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
 
-    responseOpt match {
+    val response = responseOpt match {
       case Some(response) =>
         val responseSend = request.context.buildResponse(response)
         val responseString =
           if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
           else None
-        requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
+        new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
       case None =>
-        sendActionOnlyResponse(request)(NoOpAction)
+        new RequestChannel.NoOpResponse(request)
     }
+    sendResponse(response)
   }
 
-  private def sendActionOnlyResponse(request: RequestChannel.Request)(responseAction: ResponseAction): Unit = {
-    requestChannel.sendResponse(new RequestChannel.Response(request, None, responseAction, None))
+  private def sendResponse(response: RequestChannel.Response): Unit = {
+    requestChannel.sendResponse(response)
   }
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index ba7203e..5a505c3 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -20,19 +20,18 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util
 
-import AbstractFetcherThread.ResultWithPartitions
-import kafka.cluster.BrokerEndPoint
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests.EpochEndOffset._
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest => JFetchRequest}
-import ReplicaAlterLogDirsThread.FetchRequest
-import ReplicaAlterLogDirsThread.PartitionData
 import kafka.api.Request
+import kafka.cluster.BrokerEndPoint
+import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.ReplicaAlterLogDirsThread.{FetchRequest, PartitionData}
 import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest => JFetchRequest}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, Set, mutable}
@@ -58,7 +57,7 @@ class ReplicaAlterLogDirsThread(name: String,
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
   def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
-    var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null
+    var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
     val request = fetchRequest.underlying.build()
 
     def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
@@ -256,7 +255,7 @@ object ReplicaAlterLogDirsThread {
     override def toString = underlying.toString
   }
 
-  private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
+  private[server] class PartitionData(val underlying: FetchResponse.PartitionData[Records]) extends AbstractFetcherThread.PartitionData {
 
     def error = underlying.error
 
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 72b6616..cf8d829 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
 import org.apache.kafka.common.utils.{LogContext, Time}
 
@@ -224,7 +224,7 @@ class ReplicaFetcherThread(name: String,
   protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest.underlying)
-      val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
+      val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
       if (!fetchSessionHandler.handleResponse(fetchResponse)) {
         Nil
       } else {
@@ -389,7 +389,7 @@ object ReplicaFetcherThread {
     override def toString = underlying.toString
   }
 
-  private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
+  private[server] class PartitionData(val underlying: FetchResponse.PartitionData[Records]) extends AbstractFetcherThread.PartitionData {
 
     def error = underlying.error
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5dbe25b..24f3235 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -31,21 +31,20 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
-import org.apache.kafka.common.protocol.Errors.KAFKA_STORAGE_ERROR
+import org.apache.kafka.common.protocol.Errors.{KAFKA_STORAGE_ERROR, UNKNOWN_TOPIC_OR_PARTITION}
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -471,7 +470,7 @@ class ReplicaManager(val config: KafkaConfig,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     delayedProduceLock: Option[Lock] = None,
-                    processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
+                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
@@ -485,7 +484,7 @@ class ReplicaManager(val config: KafkaConfig,
                   new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
       }
 
-      processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
+      recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats))
 
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
         // create delayed produce operation
diff --git a/core/src/main/scala/kafka/server/ThrottledChannel.scala b/core/src/main/scala/kafka/server/ThrottledChannel.scala
index 74357d5..8fe8649 100644
--- a/core/src/main/scala/kafka/server/ThrottledChannel.scala
+++ b/core/src/main/scala/kafka/server/ThrottledChannel.scala
@@ -19,28 +19,31 @@ package kafka.server
 
 import java.util.concurrent.{Delayed, TimeUnit}
 
-import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, StartThrottlingAction}
+import kafka.network
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
 import org.apache.kafka.common.utils.Time
 
 
 /**
   * Represents a request whose response has been delayed.
-  * @param time @Time instance to use
-  * @param throttleTimeMs delay associated with this request
+  * @param request The request that has been delayed
+  * @param time Time instance to use
+  * @param throttleTimeMs Delay associated with this request
   * @param channelThrottlingCallback Callback for channel throttling
   */
-class ThrottledChannel(val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: (ResponseAction) => Unit)
+class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: Response => Unit)
   extends Delayed with Logging {
   var endTime = time.milliseconds + throttleTimeMs
 
   // Notify the socket server that throttling has started for this channel.
-  channelThrottlingCallback(StartThrottlingAction)
+  channelThrottlingCallback(new RequestChannel.StartThrottlingResponse(request))
 
   // Notify the socket server that throttling has been done for this channel.
   def notifyThrottlingDone(): Unit = {
     trace("Channel throttled for: " + throttleTimeMs + " ms")
-    channelThrottlingCallback(EndThrottlingAction)
+    channelThrottlingCallback(new network.RequestChannel.EndThrottlingResponse(request))
   }
 
   override def getDelay(unit: TimeUnit): Long = {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index da45be2..681497e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,8 +19,7 @@ import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
 import kafka.admin.AdminClient
-import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
-import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
 import kafka.common.TopicAndPartition
 import kafka.log.LogConfig
 import kafka.network.SocketServer
@@ -28,23 +27,21 @@ import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.NewPartitions
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
-import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
 import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.{Node, TopicPartition, requests}
+import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 
@@ -115,7 +112,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val requestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
     Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse],
       ApiKeys.PRODUCE -> classOf[requests.ProduceResponse],
-      ApiKeys.FETCH -> classOf[requests.FetchResponse],
+      ApiKeys.FETCH -> classOf[requests.FetchResponse[Records]],
       ApiKeys.LIST_OFFSETS -> classOf[requests.ListOffsetResponse],
       ApiKeys.OFFSET_COMMIT -> classOf[requests.OffsetCommitResponse],
       ApiKeys.OFFSET_FETCH -> classOf[requests.OffsetFetchResponse],
@@ -153,7 +150,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
-    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
+    ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
     ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
     ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1 == tp).get._2),
     ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 0ecc3f5..d4dcd9f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -17,8 +17,8 @@
 
 package kafka.coordinator
 
-import java.util.{ Collections, Random }
-import java.util.concurrent.{ ConcurrentHashMap, Executors }
+import java.util.{Collections, Random}
+import java.util.concurrent.{ConcurrentHashMap, Executors}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.Lock
 
@@ -30,10 +30,10 @@ import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{ MemoryRecords, RecordBatch, RecordsProcessingStats }
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.EasyMock
-import org.junit.{ After, Before }
+import org.junit.{After, Before}
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -177,7 +177,7 @@ object AbstractCoordinatorConcurrencyTest {
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                                delayedProduceLock: Option[Lock] = None,
-                               processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
+                               processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
 
       if (entriesPerPartition.isEmpty)
         return
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index f68ff9e..2a367e0 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -63,7 +63,7 @@ class LogValidatorTest {
     assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
       compressed = false)
   }
 
@@ -103,8 +103,8 @@ class LogValidatorTest {
       records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
 
-    val stats = validatedResults.recordsProcessingStats
-    verifyRecordsProcessingStats(stats, numConvertedRecords = 3, records, compressed = true)
+    val stats = validatedResults.recordConversionStats
+    verifyRecordConversionStats(stats, numConvertedRecords = 3, records, compressed = true)
   }
 
   @Test
@@ -145,7 +145,7 @@ class LogValidatorTest {
       records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
       compressed = true)
   }
 
@@ -255,7 +255,7 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, numConvertedRecords = 0, records,
+    verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 0, records,
       compressed = false)
   }
 
@@ -320,7 +320,7 @@ class LogValidatorTest {
     assertEquals("Offset of max timestamp should be 2", 2, validatingResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", validatingResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, numConvertedRecords = 3, records,
+    verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = true)
   }
 
@@ -364,7 +364,7 @@ class LogValidatorTest {
       validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = true)
   }
 
@@ -405,7 +405,7 @@ class LogValidatorTest {
       validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = true)
   }
 
@@ -466,7 +466,7 @@ class LogValidatorTest {
       validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
       compressed = true)
   }
 
@@ -697,7 +697,7 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       isFromClient = true)
     checkOffsets(validatedResults.validatedRecords, offset)
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = false)
   }
 
@@ -719,7 +719,7 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       isFromClient = true)
     checkOffsets(validatedResults.validatedRecords, offset)
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = false)
   }
 
@@ -741,7 +741,7 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       isFromClient = true)
     checkOffsets(validatedResults.validatedRecords, offset)
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = true)
   }
 
@@ -763,7 +763,7 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       isFromClient = true)
     checkOffsets(validatedResults.validatedRecords, offset)
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+    verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = true)
   }
 
@@ -1131,8 +1131,8 @@ class LogValidatorTest {
     }
   }
 
-  def verifyRecordsProcessingStats(stats: RecordsProcessingStats, numConvertedRecords: Int, records: MemoryRecords,
-                                   compressed: Boolean): Unit = {
+  def verifyRecordConversionStats(stats: RecordConversionStats, numConvertedRecords: Int, records: MemoryRecords,
+                                  compressed: Boolean): Unit = {
     assertNotNull("Records processing info is null", stats)
     assertEquals(numConvertedRecords, stats.numRecordsConverted)
     if (numConvertedRecords > 0) {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index dfa388b..c0e27cf 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -20,13 +20,12 @@ package kafka.network
 import java.io._
 import java.net._
 import java.nio.ByteBuffer
-import java.util.{HashMap, Random}
 import java.nio.channels.SocketChannel
-import javax.net.ssl._
+import java.util.{HashMap, Random}
 
 import com.yammer.metrics.core.{Gauge, Meter}
 import com.yammer.metrics.{Metrics => YammerMetrics}
-import kafka.network.RequestChannel.{NoOpAction, ResponseAction, SendAction}
+import javax.net.ssl._
 import kafka.security.CredentialProvider
 import kafka.server.{KafkaConfig, ThrottledChannel}
 import kafka.utils.TestUtils
@@ -42,7 +41,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
 import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
 import org.apache.log4j.Level
-import org.junit.Assert.{assertEquals, _}
+import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 
@@ -132,7 +131,7 @@ class SocketServerTest extends JUnitSuite {
     byteBuffer.rewind()
 
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
-    channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, Some(request.header.toString)))
+    channel.sendResponse(new RequestChannel.SendResponse(request, send, Some(request.header.toString), None))
   }
 
   def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
@@ -215,7 +214,7 @@ class SocketServerTest extends JUnitSuite {
     for (_ <- 0 until 10) {
       val request = receiveRequest(server.requestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction, None))
+      server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
     }
   }
 
@@ -229,7 +228,7 @@ class SocketServerTest extends JUnitSuite {
     for (_ <- 0 until 3) {
       val request = receiveRequest(server.requestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction, None))
+      server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
     }
   }
 
@@ -397,7 +396,7 @@ class SocketServerTest extends JUnitSuite {
 
   // Prepares test setup for throttled channel tests. throttlingDone controls whether or not throttling has completed
   // in quota manager.
-  def throttledChannelTestSetUp(socket: Socket, serializedBytes: Array[Byte], action: RequestChannel.ResponseAction,
+  def throttledChannelTestSetUp(socket: Socket, serializedBytes: Array[Byte], noOpResponse: Boolean,
                                 throttlingInProgress: Boolean): RequestChannel.Request = {
     sendRequest(socket, serializedBytes)
 
@@ -406,16 +405,21 @@ class SocketServerTest extends JUnitSuite {
     val request = receiveRequest(server.requestChannel)
     val byteBuffer = request.body[AbstractRequest].serialize(request.header)
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
-    def channelThrottlingCallback(responseAction: ResponseAction): Unit = {
-      server.requestChannel.sendResponse(new RequestChannel.Response(request, None, responseAction, None))
+    def channelThrottlingCallback(response: RequestChannel.Response): Unit = {
+      server.requestChannel.sendResponse(response)
     }
-    val throttledChannel = new ThrottledChannel(new MockTime(), 100, channelThrottlingCallback)
-    server.requestChannel.sendResponse(new RequestChannel.Response(request, Some(send), action,
-      Some(request.header.toString)))
+    val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, channelThrottlingCallback)
+    val response =
+      if (!noOpResponse)
+        new RequestChannel.SendResponse(request, send, Some(request.header.toString), None)
+      else
+        new RequestChannel.NoOpResponse(request)
+    server.requestChannel.sendResponse(response)
 
     // Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is
     // false.
-    if (!throttlingInProgress) throttledChannel.notifyThrottlingDone()
+    if (!throttlingInProgress)
+      throttledChannel.notifyThrottlingDone()
 
     request
   }
@@ -428,7 +432,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
-    val request = throttledChannelTestSetUp(socket, serializedBytes, SendAction, true)
+    val request = throttledChannelTestSetUp(socket, serializedBytes, false, true)
 
     // receive response
     assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
@@ -442,7 +446,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
-    val request = throttledChannelTestSetUp(socket, serializedBytes, SendAction, false)
+    val request = throttledChannelTestSetUp(socket, serializedBytes, false, false)
 
     // receive response
     assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
@@ -457,7 +461,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
-    val request = throttledChannelTestSetUp(socket, serializedBytes, NoOpAction, true)
+    val request = throttledChannelTestSetUp(socket, serializedBytes, true, true)
 
     TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.MUTED_AND_THROTTLED), "fail")
     // Channel should still be muted.
@@ -469,7 +473,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
-    val request = throttledChannelTestSetUp(socket, serializedBytes, NoOpAction, false)
+    val request = throttledChannelTestSetUp(socket, serializedBytes, true, false)
 
     // Since throttling is already done, the channel can be unmuted.
     TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.NOT_MUTED), "fail")
@@ -675,7 +679,7 @@ class SocketServerTest extends JUnitSuite {
       // detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an
       // IOException.
       val send = new NetworkSend(request.context.connectionId, ByteBuffer.allocate(550000))
-      channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, None))
+      channel.sendResponse(new RequestChannel.SendResponse(request, send, None, None))
       TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount,
         s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
 
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index b99bac8..c5275c2 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -16,13 +16,22 @@
  */
 package kafka.server
 
+import java.net.InetAddress
+import java.util
 import java.util.Collections
 
-import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, Session, StartThrottlingAction}
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.{EndThrottlingResponse, Session, StartThrottlingResponse}
 import kafka.server.QuotaType._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.{MockTime, Sanitizer}
+import org.easymock.EasyMock
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Before, Test}
 
@@ -32,11 +41,11 @@ class ClientQuotaManagerTest {
   private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500)
 
   var numCallbacks: Int = 0
-  def callback (responseAction: ResponseAction) {
+  def callback (response: RequestChannel.Response) {
     // Count how many times this callback is called for notifyThrottlingDone().
-    responseAction match {
-      case StartThrottlingAction =>
-      case EndThrottlingAction => numCallbacks += 1
+    response match {
+      case _: StartThrottlingResponse =>
+      case _: EndThrottlingResponse => numCallbacks += 1
     }
   }
 
@@ -45,15 +54,30 @@ class ClientQuotaManagerTest {
     numCallbacks = 0
   }
 
+  private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
+                                                 listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+
+    val request = builder.build()
+    val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
+    val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+
+    // read the header from the buffer first so that the body can be read next from the Request constructor
+    val header = RequestHeader.parse(buffer)
+    val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
+      listenerName, SecurityProtocol.PLAINTEXT)
+    (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0, MemoryPool.NONE, buffer,
+      requestChannelMetrics))
+  }
+
   private def maybeRecord(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
     val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
-    quotaManager.maybeRecordAndGetThrottleTimeMs(Session(principal, null),clientId, value, time.milliseconds())
+    quotaManager.maybeRecordAndGetThrottleTimeMs(Session(principal, null), clientId, value, time.milliseconds())
   }
 
   private def throttle(quotaManager: ClientQuotaManager, user: String, clientId: String, throttleTimeMs: Int,
-                       channelThrottlingCallback: (ResponseAction) => Unit) {
-    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
-    quotaManager.throttle(Session(principal, null),clientId, throttleTimeMs, channelThrottlingCallback)
+                       channelThrottlingCallback: (RequestChannel.Response) => Unit) {
+    val (_, request) = buildRequest(FetchRequest.Builder.forConsumer(0, 1000, new util.HashMap[TopicPartition, PartitionData]))
+    quotaManager.throttle(request, throttleTimeMs, channelThrottlingCallback)
   }
 
   private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
@@ -364,6 +388,7 @@ class ClientQuotaManagerTest {
       // the sensor should get recreated
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1")
       assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
+      assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
     } finally {
       clientMetrics.shutdown()
     }
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 03137e1..424b8c7 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -26,9 +26,8 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch, Records}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
 import org.junit.Assert._
 import org.junit.Test
@@ -64,7 +63,7 @@ class FetchRequestTest extends BaseRequestTest {
     partitionMap
   }
 
-  private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = {
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse[MemoryRecords] = {
     val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
     FetchResponse.parse(response, request.version)
   }
@@ -219,7 +218,7 @@ class FetchRequestTest extends BaseRequestTest {
     // batch is not complete, but sent when the producer is closed
     futures.foreach(_.get)
 
-    def fetch(version: Short, maxPartitionBytes: Int, closeAfterPartialResponse: Boolean): Option[FetchResponse] = {
+    def fetch(version: Short, maxPartitionBytes: Int, closeAfterPartialResponse: Boolean): Option[FetchResponse[MemoryRecords]] = {
       val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
         Seq(topicPartition))).build(version)
 
@@ -279,17 +278,32 @@ class FetchRequestTest extends BaseRequestTest {
     secondBatchFutures.foreach(_.get)
 
     def check(fetchOffset: Long, requestVersion: Short, expectedOffset: Long, expectedNumBatches: Int, expectedMagic: Byte): Unit = {
-      val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(Int.MaxValue,
-        Seq(topicPartition), Map(topicPartition -> fetchOffset))).build(requestVersion)
-      val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
-      val partitionData = fetchResponse.responseData.get(topicPartition)
-      assertEquals(Errors.NONE, partitionData.error)
-      assertTrue(partitionData.highWatermark > 0)
-      val batches = partitionData.records.batches.asScala.toBuffer
-      assertEquals(expectedNumBatches, batches.size)
-      val batch = batches.head
-      assertEquals(expectedMagic, batch.magic)
-      assertEquals(expectedOffset, batch.baseOffset)
+      var batchesReceived = 0
+      var currentFetchOffset = fetchOffset
+      var currentExpectedOffset = expectedOffset
+
+      // With KIP-283, we might not receive all batches in a single fetch request so loop through till we have consumed
+      // all batches we are interested in.
+      while (batchesReceived < expectedNumBatches) {
+        val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(Int.MaxValue,
+          Seq(topicPartition), Map(topicPartition -> currentFetchOffset))).build(requestVersion)
+        val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+        // validate response
+        val partitionData = fetchResponse.responseData.get(topicPartition)
+        assertEquals(Errors.NONE, partitionData.error)
+        assertTrue(partitionData.highWatermark > 0)
+        val batches = partitionData.records.batches.asScala.toBuffer
+        val batch = batches.head
+        assertEquals(expectedMagic, batch.magic)
+        assertEquals(currentExpectedOffset, batch.baseOffset)
+
+        currentFetchOffset = batches.last.lastOffset + 1
+        currentExpectedOffset += (batches.last.lastOffset - batches.head.baseOffset + 1)
+        batchesReceived += batches.size
+      }
+
+      assertEquals(expectedNumBatches, batchesReceived)
     }
 
     // down conversion to message format 0, batches of 1 message are returned so we receive the exact offset we requested
@@ -317,9 +331,9 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   /**
-    * Test that when an incremental fetch session contains partitions with an error,
-    * those partitions are returned in all incremental fetch requests.
-    */
+   * Test that when an incremental fetch session contains partitions with an error,
+   * those partitions are returned in all incremental fetch requests.
+   */
   @Test
   def testCreateIncrementalFetchWithPartitionsInError(): Unit = {
     def createFetchRequest(topicPartitions: Seq[TopicPartition],
@@ -327,9 +341,9 @@ class FetchRequestTest extends BaseRequestTest {
                            toForget: Seq[TopicPartition]): FetchRequest =
       FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
         createPartitionMap(Integer.MAX_VALUE, topicPartitions, Map.empty))
-          .toForget(toForget.asJava)
-          .metadata(metadata)
-          .build()
+        .toForget(toForget.asJava)
+        .metadata(metadata)
+        .build()
     val foo0 = new TopicPartition("foo", 0)
     val foo1 = new TopicPartition("foo", 1)
     createTopic("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
@@ -370,11 +384,11 @@ class FetchRequestTest extends BaseRequestTest {
     assertFalse(resp4.responseData().containsKey(bar0))
   }
 
-  private def records(partitionData: FetchResponse.PartitionData): Seq[Record] = {
+  private def records(partitionData: FetchResponse.PartitionData[MemoryRecords]): Seq[Record] = {
     partitionData.records.records.asScala.toIndexedSeq
   }
 
-  private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse,
+  private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse[MemoryRecords],
                                  maxPartitionBytes: Int, maxResponseBytes: Int, numMessagesPerPartition: Int): Unit = {
     assertEquals(expectedPartitions, fetchResponse.responseData.keySet.asScala.toSeq)
     var emptyResponseSeen = false
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 8264c1b..84efa6b 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -22,11 +22,12 @@ import java.util.Collections
 import kafka.utils.MockTime
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{AbstractRecords, Records}
 import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INVALID_SESSION_ID}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
-import org.junit.{Rule, Test}
 import org.junit.Assert._
 import org.junit.rules.Timeout
+import org.junit.{Rule, Test}
 
 class FetchSessionTest {
   @Rule
@@ -152,7 +153,7 @@ class FetchSessionTest {
     })
     assertEquals(0, context2.getFetchOffset(new TopicPartition("foo", 0)).get)
     assertEquals(10, context2.getFetchOffset(new TopicPartition("foo", 1)).get)
-    val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+    val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
     respData2.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
       Errors.NONE, 100, 100, 100, null, null))
     respData2.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
@@ -211,7 +212,7 @@ class FetchSessionTest {
         new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, EMPTY_PART_LIST, false)
       assertEquals(classOf[SessionlessFetchContext], context7.getClass)
       assertEquals(0, cache.size())
-      val respData7 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+      val respData7 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
       respData7.put(new TopicPartition("bar", 0),
         new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
       respData7.put(new TopicPartition("bar", 1),
@@ -234,7 +235,7 @@ class FetchSessionTest {
     reqData1.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100))
     val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false)
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+    val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
     respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
       Errors.NONE, 100, 100, 100, null, null))
     respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
@@ -261,7 +262,7 @@ class FetchSessionTest {
     assertEquals(10, context2.getFetchOffset(new TopicPartition("foo", 1)).get)
     assertEquals(15, context2.getFetchOffset(new TopicPartition("bar", 0)).get)
     assertEquals(None, context2.getFetchOffset(new TopicPartition("bar", 2)))
-    val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+    val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
     respData2.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
       Errors.NONE, 10, 10, 10, null, null))
     respData2.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(
@@ -284,7 +285,7 @@ class FetchSessionTest {
     reqData1.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100))
     val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false)
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+    val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
     respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
       Errors.NONE, 100, 100, 100, null, null))
     respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
@@ -303,7 +304,7 @@ class FetchSessionTest {
     val context2 = fetchManager.newContext(
       new JFetchMetadata(resp1.sessionId(), 1), reqData2, removed2, false)
     assertEquals(classOf[SessionlessFetchContext], context2.getClass)
-    val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+    val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
     val resp2 = context2.updateAndGenerateResponseData(respData2)
     assertEquals(INVALID_SESSION_ID, resp2.sessionId())
     assertTrue(resp2.responseData().isEmpty)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1d6092a..d880011 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -29,6 +29,7 @@ import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{Log, TimestampOffset}
 import kafka.network.RequestChannel
+import kafka.network.RequestChannel.SendResponse
 import kafka.security.auth.Authorizer
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{MockTime, TestUtils}
@@ -542,7 +543,10 @@ class KafkaApisTest {
   }
 
   private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {
-    val send = capturedResponse.getValue.responseSend.get
+    val response = capturedResponse.getValue
+    assertTrue(s"Unexpected response type: ${response.getClass}", response.isInstanceOf[SendResponse])
+    val sendResponse = response.asInstanceOf[SendResponse]
+    val send = sendResponse.responseSend
     val channel = new ByteBufferChannel(send.size)
     send.writeTo(channel)
     channel.close()
@@ -556,7 +560,7 @@ class KafkaApisTest {
     EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request]()))
       .andReturn(0)
     EasyMock.expect(clientRequestQuotaManager.throttle(EasyMock.anyObject[RequestChannel.Request](), EasyMock.eq(0),
-      EasyMock.anyObject[RequestChannel.ResponseAction => Unit]()))
+      EasyMock.anyObject[RequestChannel.Response => Unit]()))
 
     val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
     EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index dcbeb21..29a1c9f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -18,13 +18,13 @@ package kafka.server
 
 
 import kafka.api.Request
-import kafka.cluster.{BrokerEndPoint, Replica, Partition}
+import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.LogManager
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.epoch.LeaderEpochCache
-import org.apache.kafka.common.errors.{ReplicaNotAvailableException, KafkaStorageException}
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{KafkaStorageException, ReplicaNotAvailableException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.EpochEndOffset
 import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH_OFFSET, UNDEFINED_EPOCH}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e62c9fd..3f2f66c 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -413,7 +413,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = {
     apiKey match {
       case ApiKeys.PRODUCE => new ProduceResponse(response).throttleTimeMs
-      case ApiKeys.FETCH => new FetchResponse(response).throttleTimeMs
+      case ApiKeys.FETCH => FetchResponse.parse(response).throttleTimeMs
       case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
       case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
       case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index 8ba584c..ff781a2 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -18,12 +18,22 @@
 package kafka.server
 
 
+import java.net.InetAddress
+import java.util
 import java.util.Collections
 import java.util.concurrent.{DelayQueue, TimeUnit}
 
-import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, StartThrottlingAction}
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.{EndThrottlingResponse, Response, StartThrottlingResponse}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.MetricConfig
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.MockTime
+import org.easymock.EasyMock
 import org.junit.{Assert, Before, Test}
 
 class ThrottledChannelExpirationTest {
@@ -33,11 +43,27 @@ class ThrottledChannelExpirationTest {
   private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(),
                                                                     Collections.emptyList(),
                                                                     time)
+  private val request = buildRequest(FetchRequest.Builder.forConsumer(0, 1000, new util.HashMap[TopicPartition, PartitionData]))._2
 
-  def callback(responseAction: ResponseAction): Unit = {
-    responseAction match {
-      case StartThrottlingAction => numCallbacksForStartThrottling += 1
-      case EndThrottlingAction => numCallbacksForEndThrottling += 1
+  private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
+                                                 listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+
+    val request = builder.build()
+    val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
+    val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+
+    // read the header from the buffer first so that the body can be read next from the Request constructor
+    val header = RequestHeader.parse(buffer)
+    val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
+      listenerName, SecurityProtocol.PLAINTEXT)
+    (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0, MemoryPool.NONE, buffer,
+      requestChannelMetrics))
+  }
+
+  def callback(response: Response): Unit = {
+    response match {
+      case _: StartThrottlingResponse => numCallbacksForStartThrottling += 1
+      case _: EndThrottlingResponse => numCallbacksForEndThrottling += 1
     }
   }
 
@@ -55,10 +81,10 @@ class ThrottledChannelExpirationTest {
     val reaper = new clientMetrics.ThrottledChannelReaper(delayQueue, "")
     try {
       // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp.
-      val channel1 = new ThrottledChannel(time, 10, callback)
-      val channel2 = new ThrottledChannel(time, 30, callback)
-      val channel3 = new ThrottledChannel(time, 30, callback)
-      val channel4 = new ThrottledChannel(time, 20, callback)
+      val channel1 = new ThrottledChannel(request, time, 10, callback)
+      val channel2 = new ThrottledChannel(request, time, 30, callback)
+      val channel3 = new ThrottledChannel(request, time, 30, callback)
+      val channel4 = new ThrottledChannel(request, time, 20, callback)
       delayQueue.add(channel1)
       delayQueue.add(channel2)
       delayQueue.add(channel3)
@@ -82,9 +108,9 @@ class ThrottledChannelExpirationTest {
 
   @Test
   def testThrottledChannelDelay() {
-    val t1: ThrottledChannel = new ThrottledChannel(time, 10, callback)
-    val t2: ThrottledChannel = new ThrottledChannel(time, 20, callback)
-    val t3: ThrottledChannel = new ThrottledChannel(time, 20, callback)
+    val t1: ThrottledChannel = new ThrottledChannel(request, time, 10, callback)
+    val t2: ThrottledChannel = new ThrottledChannel(request, time, 20, callback)
+    val t3: ThrottledChannel = new ThrottledChannel(request, time, 20, callback)
     Assert.assertEquals(10, t1.throttleTimeMs)
     Assert.assertEquals(20, t2.throttleTimeMs)
     Assert.assertEquals(20, t3.throttleTimeMs)
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index 50a4d74..b7c037e 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -19,12 +19,12 @@ package kafka.server.epoch.util
 import kafka.cluster.BrokerEndPoint
 import kafka.server.BlockingSend
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
-import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.AbstractRequest.Builder
-import org.apache.kafka.common.requests.FetchResponse.PartitionData
 import org.apache.kafka.common.requests.{AbstractRequest, EpochEndOffset, FetchResponse, OffsetsForLeaderEpochResponse, FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.utils.{SystemTime, Time}
+import org.apache.kafka.common.{Node, TopicPartition}
 
 /**
   * Stub network client used for testing the ReplicaFetcher, wraps the MockClient used for consumer testing
@@ -66,7 +66,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
 
       case ApiKeys.FETCH =>
         fetchCount += 1
-        new FetchResponse(Errors.NONE, new java.util.LinkedHashMap[TopicPartition, PartitionData], 0,
+        new FetchResponse(Errors.NONE, new java.util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]], 0,
           JFetchMetadata.INVALID_SESSION_ID)
 
       case _ =>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.