You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/07/30 17:30:37 UTC

[kafka] branch trunk updated: KAFKA-9629 Use generated protocol for Fetch API (#9008)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4cd2396  KAFKA-9629 Use generated protocol for Fetch API (#9008)
4cd2396 is described below

commit 4cd2396db31418c90005c998d9107ad40df055b2
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Jul 30 13:29:39 2020 -0400

    KAFKA-9629 Use generated protocol for Fetch API (#9008)
    
    Refactored FetchRequest and FetchResponse to use the generated message classes for serialization and deserialization. This allows us to bypass unnecessary Struct conversion in a few places. A new "records" type was added to the message protocol which uses BaseRecords as the field type. When sending, we can set a FileRecords instance on the message, and when receiving the message class will use MemoryRecords.
    
    Also included a few JMH benchmarks which indicate a small performance improvement for requests with high partition counts or small record sizes.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Boyang Chen <bo...@confluent.io>, David Jacot <dj...@confluent.io>, Lucas Bradstreet <lu...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Colin P. McCabe <cm...@apache.org>
---
 checkstyle/import-control.xml                      |   1 +
 checkstyle/suppressions.xml                        |   2 +
 .../kafka/clients/consumer/internals/Fetcher.java  |  42 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/protocol/RecordsReadable.java     |  92 ++++
 .../kafka/common/protocol/RecordsWritable.java     | 139 +++++
 .../kafka/common/requests/AbstractRequest.java     |   3 +-
 .../common/requests/AbstractRequestResponse.java   |   1 +
 .../kafka/common/requests/AbstractResponse.java    |   3 +-
 .../apache/kafka/common/requests/FetchRequest.java | 417 ++++-----------
 .../kafka/common/requests/FetchResponse.java       | 577 +++++++--------------
 .../kafka/common/requests/LeaderAndIsrRequest.java |   3 +-
 .../kafka/common/requests/StopReplicaRequest.java  |   3 +-
 .../common/requests/UpdateMetadataRequest.java     |   3 +-
 .../resources/common/message/FetchRequest.json     |  24 +-
 .../resources/common/message/FetchResponse.json    |  50 +-
 .../apache/kafka/common/message/MessageTest.java   |   4 +-
 .../kafka/common/protocol/RecordsWritableTest.java |  63 +++
 .../kafka/common/requests/RequestResponseTest.java |  75 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  29 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |   3 +-
 .../java/org/apache/kafka/message/FieldSpec.java   |   2 +-
 .../java/org/apache/kafka/message/FieldType.java   |  34 ++
 .../apache/kafka/message/MessageDataGenerator.java |  67 ++-
 .../org/apache/kafka/message/MessageGenerator.java |   8 +
 .../org/apache/kafka/message/SchemaGenerator.java  |   2 +
 .../kafka/jmh/common/FetchRequestBenchmark.java    | 129 +++++
 .../kafka/jmh/common/FetchResponseBenchmark.java   | 104 ++++
 28 files changed, 1086 insertions(+), 800 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f45e362..d3fc70a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -119,6 +119,7 @@
     <subpackage name="protocol">
       <allow pkg="org.apache.kafka.common.errors" />
       <allow pkg="org.apache.kafka.common.message" />
+      <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.protocol.types" />
       <allow pkg="org.apache.kafka.common.record" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7b5ae40..f6bafbf 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -171,6 +171,8 @@
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS)"
               files="streams[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
 
+    <suppress checks="ImportControl" files="FetchResponseData.java"/>
+
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
               files="StreamThreadTest.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3251f4f..ed14c97 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -310,7 +310,7 @@ public class Fetcher<K, V> implements Closeable {
                                     log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                                             isolationLevel, fetchOffset, partition, partitionData);
 
-                                    Iterator<? extends RecordBatch> batches = partitionData.records.batches().iterator();
+                                    Iterator<? extends RecordBatch> batches = partitionData.records().batches().iterator();
                                     short responseVersion = resp.requestHeader().apiVersion();
 
                                     completedFetches.add(new CompletedFetch(partition, partitionData,
@@ -616,7 +616,7 @@ public class Fetcher<K, V> implements Closeable {
                             // in cases such as the TopicAuthorizationException, and the second condition ensures that no
                             // potential data loss due to an exception in a following record.
                             FetchResponse.PartitionData partition = records.partitionData;
-                            if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
+                            if (fetched.isEmpty() && (partition.records() == null || partition.records().sizeInBytes() == 0)) {
                                 completedFetches.poll();
                             }
                             throw e;
@@ -1210,7 +1210,7 @@ public class Fetcher<K, V> implements Closeable {
         FetchResponse.PartitionData<Records> partition = nextCompletedFetch.partitionData;
         long fetchOffset = nextCompletedFetch.nextFetchOffset;
         CompletedFetch completedFetch = null;
-        Errors error = partition.error;
+        Errors error = partition.error();
 
         try {
             if (!subscriptions.hasValidPosition(tp)) {
@@ -1227,11 +1227,11 @@ public class Fetcher<K, V> implements Closeable {
                 }
 
                 log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
-                        partition.records.sizeInBytes(), tp, position);
-                Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
+                        partition.records().sizeInBytes(), tp, position);
+                Iterator<? extends RecordBatch> batches = partition.records().batches().iterator();
                 completedFetch = nextCompletedFetch;
 
-                if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
+                if (!batches.hasNext() && partition.records().sizeInBytes() > 0) {
                     if (completedFetch.responseVersion < 3) {
                         // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
                         Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
@@ -1249,26 +1249,26 @@ public class Fetcher<K, V> implements Closeable {
                     }
                 }
 
-                if (partition.highWatermark >= 0) {
-                    log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
-                    subscriptions.updateHighWatermark(tp, partition.highWatermark);
+                if (partition.highWatermark() >= 0) {
+                    log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
+                    subscriptions.updateHighWatermark(tp, partition.highWatermark());
                 }
 
-                if (partition.logStartOffset >= 0) {
-                    log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
-                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
+                if (partition.logStartOffset() >= 0) {
+                    log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
+                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
                 }
 
-                if (partition.lastStableOffset >= 0) {
-                    log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
-                    subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
+                if (partition.lastStableOffset() >= 0) {
+                    log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
+                    subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
                 }
 
-                if (partition.preferredReadReplica.isPresent()) {
-                    subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica.get(), () -> {
+                if (partition.preferredReadReplica().isPresent()) {
+                    subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica().get(), () -> {
                         long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs();
                         log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}",
-                                tp, partition.preferredReadReplica.get(), expireTimeMs);
+                                tp, partition.preferredReadReplica().get(), expireTimeMs);
                         return expireTimeMs;
                     });
                 }
@@ -1630,13 +1630,13 @@ public class Fetcher<K, V> implements Closeable {
         }
 
         private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData<?> partition) {
-            if (partition.abortedTransactions == null || partition.abortedTransactions.isEmpty())
+            if (partition.abortedTransactions() == null || partition.abortedTransactions().isEmpty())
                 return null;
 
             PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
-                    partition.abortedTransactions.size(), Comparator.comparingLong(o -> o.firstOffset)
+                    partition.abortedTransactions().size(), Comparator.comparingLong(o -> o.firstOffset)
             );
-            abortedTransactions.addAll(partition.abortedTransactions);
+            abortedTransactions.addAll(partition.abortedTransactions());
             return abortedTransactions;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 670de37..2a4ffa8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -67,6 +67,8 @@ import org.apache.kafka.common.message.EndTxnRequestData;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
 import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -114,8 +116,6 @@ import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.FetchRequest;
-import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
@@ -137,7 +137,7 @@ import static org.apache.kafka.common.protocol.types.Type.RECORDS;
  */
 public enum ApiKeys {
     PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()),
-    FETCH(1, "Fetch", FetchRequest.schemaVersions(), FetchResponse.schemaVersions()),
+    FETCH(1, "Fetch", FetchRequestData.SCHEMAS, FetchResponseData.SCHEMAS),
     LIST_OFFSETS(2, "ListOffsets", ListOffsetRequest.schemaVersions(), ListOffsetResponse.schemaVersions()),
     METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
     LEADER_AND_ISR(4, "LeaderAndIsr", true, LeaderAndIsrRequestData.SCHEMAS, LeaderAndIsrResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/RecordsReadable.java b/clients/src/main/java/org/apache/kafka/common/protocol/RecordsReadable.java
new file mode 100644
index 0000000..5967731
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/RecordsReadable.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReadable implements Readable {
+    private final ByteBuffer buf;
+
+    public RecordsReadable(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.get();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.getShort();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.getLong();
+    }
+
+    @Override
+    public double readDouble() {
+        return ByteUtils.readDouble(buf);
+    }
+
+    @Override
+    public void readArray(byte[] arr) {
+        buf.get(arr);
+    }
+
+    @Override
+    public int readUnsignedVarint() {
+        return ByteUtils.readUnsignedVarint(buf);
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer(int length) {
+        ByteBuffer res = buf.slice();
+        res.limit(length);
+
+        buf.position(buf.position() + length);
+
+        return res;
+    }
+
+    public BaseRecords readRecords(int length) {
+        if (length < 0) {
+            // no records
+            return null;
+        } else {
+            ByteBuffer recordsBuffer = readByteBuffer(length);
+            return MemoryRecords.readableRecords(recordsBuffer);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/RecordsWritable.java b/clients/src/main/java/org/apache/kafka/common/protocol/RecordsWritable.java
new file mode 100644
index 0000000..61f3ee1a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/RecordsWritable.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWritable implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteBuffer buffer;
+    private int mark;
+
+    public RecordsWritable(String dest, int messageSizeExcludingRecords, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.buffer = ByteBuffer.allocate(messageSizeExcludingRecords);
+        this.mark = 0;
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        buffer.put(val);
+    }
+
+    @Override
+    public void writeShort(short val) {
+        buffer.putShort(val);
+    }
+
+    @Override
+    public void writeInt(int val) {
+        buffer.putInt(val);
+    }
+
+    @Override
+    public void writeLong(long val) {
+        buffer.putLong(val);
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        ByteUtils.writeDouble(val, buffer);
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        buffer.put(arr);
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        ByteUtils.writeUnsignedVarint(i, buffer);
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        buffer.put(src);
+    }
+
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend
+     */
+    public void flush() {
+        int end = buffer.position();
+        int len = end - mark;
+
+        if (len > 0) {
+            int limit = buffer.limit();
+
+            // Set the desired absolute position and limit before slicing
+            buffer.position(mark);
+            buffer.limit(end);
+            ByteBuffer slice = buffer.slice();
+
+            // Restore absolute position and limit on original buffer
+            buffer.limit(limit);
+            buffer.position(end);
+
+            // Update the mark to the end of slice we just took
+            mark = end;
+
+            ByteBufferSend send = new ByteBufferSend(dest, slice);
+            sendConsumer.accept(send);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index f4085b8..b8266d0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -146,7 +147,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
             case PRODUCE:
                 return new ProduceRequest(struct, apiVersion);
             case FETCH:
-                return new FetchRequest(struct, apiVersion);
+                return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion);
             case LIST_OFFSETS:
                 return new ListOffsetRequest(struct, apiVersion);
             case METADATA:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
index b02659d..39698a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
@@ -17,4 +17,5 @@
 package org.apache.kafka.common.requests;
 
 public interface AbstractRequestResponse {
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 3258142..ed21a0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -89,7 +90,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
             case PRODUCE:
                 return new ProduceResponse(struct);
             case FETCH:
-                return FetchResponse.parse(struct);
+                return new FetchResponse<>(new FetchResponseData(struct, version));
             case LIST_OFFSETS:
                 return new ListOffsetResponse(struct);
             case METADATA:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 29cbb60..01b5e1a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -18,221 +18,46 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-
-import static org.apache.kafka.common.protocol.CommonFields.CURRENT_LEADER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.requests.FetchMetadata.FINAL_EPOCH;
-import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import java.util.stream.Collectors;
 
 public class FetchRequest extends AbstractRequest {
-    public static final int CONSUMER_REPLICA_ID = -1;
 
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
-            "Topics to fetch in the order provided.");
-    private static final Field.ComplexArray FORGOTTEN_TOPICS = new Field.ComplexArray("forgotten_topics_data",
-            "Topics to remove from the fetch session.");
-    private static final Field.Int32 MAX_BYTES = new Field.Int32("max_bytes",
-            "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
-                    "if the first message in the first non-empty partition of the fetch is larger than this " +
-                    "value, the message will still be returned to ensure that progress can be made.");
-    private static final Field.Int8 ISOLATION_LEVEL = new Field.Int8("isolation_level",
-            "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
-                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
-                    "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
-                    "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
-                    "and enables the inclusion of the list of aborted transactions in the result, which allows " +
-                    "consumers to discard ABORTED transactional records");
-    private static final Field.Int32 SESSION_ID = new Field.Int32("session_id", "The fetch session ID");
-    private static final Field.Int32 SESSION_EPOCH = new Field.Int32("session_epoch", "The fetch session epoch");
-    private static final Field.Str RACK_ID = new Field.Str("rack_id", "The consumer's rack id");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
-            "Partitions to fetch.");
-
-    // partition level fields
-    private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id",
-            "Broker id of the follower. For normal consumers, use -1.");
-    private static final Field.Int64 FETCH_OFFSET = new Field.Int64("fetch_offset", "Message offset.");
-    private static final Field.Int32 PARTITION_MAX_BYTES = new Field.Int32("partition_max_bytes",
-            "Maximum bytes to fetch.");
-    private static final Field.Int32 MAX_WAIT_TIME = new Field.Int32("max_wait_time",
-            "Maximum time in ms to wait for the response.");
-    private static final Field.Int32 MIN_BYTES = new Field.Int32("min_bytes",
-            "Minimum bytes to accumulate in the response.");
-    private static final Field.Int64 LOG_START_OFFSET = new Field.Int64("log_start_offset",
-            "Earliest available offset of the follower replica. " +
-                    "The field is only used when request is sent by follower. ");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            FETCH_OFFSET,
-            PARTITION_MAX_BYTES);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-
-    private static final Schema FETCH_REQUEST_V0 = new Schema(
-            REPLICA_ID,
-            MAX_WAIT_TIME,
-            MIN_BYTES,
-            TOPICS_V0);
-
-    // The V1 Fetch Request body is the same as V0.
-    // Only the version number is incremented to indicate a newer client
-    private static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
-
-    // V2 bumped to indicate the client support message format V1 which uses relative offset and has timestamp.
-    private static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
-
-    // V3 added top level max_bytes field - the total size of partition data to accumulate in response.
-    // The partition ordering is now relevant - partitions will be processed in order they appear in request.
-    private static final Schema FETCH_REQUEST_V3 = new Schema(
-            REPLICA_ID,
-            MAX_WAIT_TIME,
-            MIN_BYTES,
-            MAX_BYTES,
-            TOPICS_V0);
-
-    // V4 adds the fetch isolation level and exposes magic v2 (via the response).
-
-    private static final Schema FETCH_REQUEST_V4 = new Schema(
-            REPLICA_ID,
-            MAX_WAIT_TIME,
-            MIN_BYTES,
-            MAX_BYTES,
-            ISOLATION_LEVEL,
-            TOPICS_V0);
-
-
-    // V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
-    private static final Field PARTITIONS_V5 = PARTITIONS.withFields(
-            PARTITION_ID,
-            FETCH_OFFSET,
-            LOG_START_OFFSET,
-            PARTITION_MAX_BYTES);
-
-    private static final Field TOPICS_V5 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V5);
-
-    private static final Schema FETCH_REQUEST_V5 = new Schema(
-            REPLICA_ID,
-            MAX_WAIT_TIME,
-            MIN_BYTES,
-            MAX_BYTES,
-            ISOLATION_LEVEL,
-            TOPICS_V5);
-
-    // V6 bumped up to indicate that the client supports KafkaStorageException. The KafkaStorageException will be
-    // translated to NotLeaderOrFollowerException in the response if version <= 5
-    private static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5;
-
-    // V7 added incremental fetch requests.
-    private static final Field.Array FORGOTTEN_PARTITIONS = new Field.Array("partitions", Type.INT32,
-            "Partitions to remove from the fetch session.");
-    private static final Field FORGOTTEN_TOPIC_DATA_V7 = FORGOTTEN_TOPICS.withFields(
-            TOPIC_NAME,
-            FORGOTTEN_PARTITIONS);
-
-    private static final Schema FETCH_REQUEST_V7 = new Schema(
-            REPLICA_ID,
-            MAX_WAIT_TIME,
-            MIN_BYTES,
-            MAX_BYTES,
-            ISOLATION_LEVEL,
-            SESSION_ID,
-            SESSION_EPOCH,
-            TOPICS_V5,
-            FORGOTTEN_TOPIC_DATA_V7);
-
-    // V8 bump used to indicate that on quota violation brokers send out responses before throttling.
-    private static final Schema FETCH_REQUEST_V8 = FETCH_REQUEST_V7;
-
-    // V9 adds the current leader epoch (see KIP-320)
-    private static final Field FETCH_REQUEST_PARTITION_V9 = PARTITIONS.withFields(
-            PARTITION_ID,
-            CURRENT_LEADER_EPOCH,
-            FETCH_OFFSET,
-            LOG_START_OFFSET,
-            PARTITION_MAX_BYTES);
-
-    private static final Field FETCH_REQUEST_TOPIC_V9 = TOPICS.withFields(
-            TOPIC_NAME,
-            FETCH_REQUEST_PARTITION_V9);
-
-    private static final Schema FETCH_REQUEST_V9 = new Schema(
-            REPLICA_ID,
-            MAX_WAIT_TIME,
-            MIN_BYTES,
-            MAX_BYTES,
-            ISOLATION_LEVEL,
-            SESSION_ID,
-            SESSION_EPOCH,
-            FETCH_REQUEST_TOPIC_V9,
-            FORGOTTEN_TOPIC_DATA_V7);
-
-    // V10 bumped up to indicate ZStandard capability. (see KIP-110)
-    private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
-
-    // V11 added rack ID to support read from followers (KIP-392)
-    private static final Schema FETCH_REQUEST_V11 = new Schema(
-            REPLICA_ID,
-            MAX_WAIT_TIME,
-            MIN_BYTES,
-            MAX_BYTES,
-            ISOLATION_LEVEL,
-            SESSION_ID,
-            SESSION_EPOCH,
-            FETCH_REQUEST_TOPIC_V9,
-            FORGOTTEN_TOPIC_DATA_V7,
-            RACK_ID);
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
-            FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9,
-            FETCH_REQUEST_V10, FETCH_REQUEST_V11};
-    }
+    public static final int CONSUMER_REPLICA_ID = -1;
 
     // default values for older versions where a request level limit did not exist
     public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
     public static final long INVALID_LOG_START_OFFSET = -1L;
 
-    private final int replicaId;
-    private final int maxWait;
-    private final int minBytes;
-    private final int maxBytes;
-    private final IsolationLevel isolationLevel;
+    private final FetchRequestData data;
 
-    // Note: the iteration order of this map is significant, since it determines the order
-    // in which partitions appear in the message.  For this reason, this map should have a
-    // deterministic iteration order, like LinkedHashMap or TreeMap (but unlike HashMap).
+    // These are immutable read-only structures derived from FetchRequestData
     private final Map<TopicPartition, PartitionData> fetchData;
-
     private final List<TopicPartition> toForget;
     private final FetchMetadata metadata;
-    private final String rackId;
+
+    public FetchRequestData data() {
+        return data;
+    }
 
     public static final class PartitionData {
         public final long fetchOffset;
@@ -273,6 +98,28 @@ public class FetchRequest extends AbstractRequest {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+        Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));
+        }));
+        return Collections.unmodifiableMap(result);
+    }
+
+    private List<TopicPartition> toForgottenTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics) {
+        List<TopicPartition> result = new ArrayList<>();
+        forgottenTopics.forEach(forgottenTopic ->
+            forgottenTopic.partitions().forEach(partitionId ->
+                result.add(new TopicPartition(forgottenTopic.topic(), partitionId))
+            )
+        );
+        return result;
+    }
+
     static final class TopicAndPartitionData<T> {
         public final String topic;
         public final LinkedHashMap<Integer, T> partitions;
@@ -366,8 +213,47 @@ public class FetchRequest extends AbstractRequest {
                 maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
             }
 
-            return new FetchRequest(version, replicaId, maxWait, minBytes, maxBytes, fetchData,
-                isolationLevel, toForget, metadata, rackId);
+            FetchRequestData fetchRequestData = new FetchRequestData();
+            fetchRequestData.setReplicaId(replicaId);
+            fetchRequestData.setMaxWaitMs(maxWait);
+            fetchRequestData.setMinBytes(minBytes);
+            fetchRequestData.setMaxBytes(maxBytes);
+            fetchRequestData.setIsolationLevel(isolationLevel.id());
+            fetchRequestData.setForgottenTopicsData(new ArrayList<>());
+            toForget.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, LinkedHashMap::new, Collectors.toList()))
+                .forEach((topic, partitions) ->
+                    fetchRequestData.forgottenTopicsData().add(new FetchRequestData.ForgottenTopic()
+                        .setTopic(topic)
+                        .setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList())))
+                );
+            fetchRequestData.setTopics(new ArrayList<>());
+
+            // We collect the partitions in a single FetchTopic only if they appear sequentially in the fetchData
+            FetchRequestData.FetchTopic fetchTopic = null;
+            for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) {
+                if (fetchTopic == null || !entry.getKey().topic().equals(fetchTopic.topic())) {
+                    fetchTopic = new FetchRequestData.FetchTopic()
+                       .setTopic(entry.getKey().topic())
+                       .setPartitions(new ArrayList<>());
+                    fetchRequestData.topics().add(fetchTopic);
+                }
+
+                fetchTopic.partitions().add(
+                    new FetchRequestData.FetchPartition().setPartition(entry.getKey().partition())
+                        .setCurrentLeaderEpoch(entry.getValue().currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                        .setFetchOffset(entry.getValue().fetchOffset)
+                        .setLogStartOffset(entry.getValue().logStartOffset)
+                        .setPartitionMaxBytes(entry.getValue().maxBytes));
+            }
+
+            if (metadata != null) {
+                fetchRequestData.setSessionEpoch(metadata.epoch());
+                fetchRequestData.setSessionId(metadata.sessionId());
+            }
+            fetchRequestData.setRackId(rackId);
+
+            return new FetchRequest(fetchRequestData, version);
         }
 
         @Override
@@ -388,64 +274,12 @@ public class FetchRequest extends AbstractRequest {
         }
     }
 
-    private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes,
-                         Map<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel,
-                         List<TopicPartition> toForget, FetchMetadata metadata, String rackId) {
-        super(ApiKeys.FETCH, version);
-        this.replicaId = replicaId;
-        this.maxWait = maxWait;
-        this.minBytes = minBytes;
-        this.maxBytes = maxBytes;
-        this.fetchData = fetchData;
-        this.isolationLevel = isolationLevel;
-        this.toForget = toForget;
-        this.metadata = metadata;
-        this.rackId = rackId;
-    }
-
-    public FetchRequest(Struct struct, short version) {
+    public FetchRequest(FetchRequestData fetchRequestData, short version) {
         super(ApiKeys.FETCH, version);
-        replicaId = struct.get(REPLICA_ID);
-        maxWait = struct.get(MAX_WAIT_TIME);
-        minBytes = struct.get(MIN_BYTES);
-        maxBytes = struct.getOrElse(MAX_BYTES, DEFAULT_RESPONSE_MAX_BYTES);
-
-        if (struct.hasField(ISOLATION_LEVEL))
-            isolationLevel = IsolationLevel.forId(struct.get(ISOLATION_LEVEL));
-        else
-            isolationLevel = IsolationLevel.READ_UNCOMMITTED;
-        toForget = new ArrayList<>(0);
-        if (struct.hasField(FORGOTTEN_TOPICS)) {
-            for (Object forgottenTopicObj : struct.get(FORGOTTEN_TOPICS)) {
-                Struct forgottenTopic = (Struct) forgottenTopicObj;
-                String topicName = forgottenTopic.get(TOPIC_NAME);
-                for (Object partObj : forgottenTopic.get(FORGOTTEN_PARTITIONS)) {
-                    Integer part = (Integer) partObj;
-                    toForget.add(new TopicPartition(topicName, part));
-                }
-            }
-        }
-        metadata = new FetchMetadata(struct.getOrElse(SESSION_ID, INVALID_SESSION_ID),
-            struct.getOrElse(SESSION_EPOCH, FINAL_EPOCH));
-
-        fetchData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                long offset = partitionResponse.get(FETCH_OFFSET);
-                int maxBytes = partitionResponse.get(PARTITION_MAX_BYTES);
-                long logStartOffset = partitionResponse.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-
-                // Current leader epoch added in v9
-                Optional<Integer> currentLeaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH);
-                PartitionData partitionData = new PartitionData(offset, logStartOffset, maxBytes, currentLeaderEpoch);
-                fetchData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        rackId = struct.getOrElse(RACK_ID, "");
+        this.data = fetchRequestData;
+        this.fetchData = toPartitionDataMap(fetchRequestData.topics());
+        this.toForget = toForgottenTopicList(fetchRequestData.forgottenTopicsData());
+        this.metadata = new FetchMetadata(fetchRequestData.sessionId(), fetchRequestData.sessionEpoch());
     }
 
     @Override
@@ -464,23 +298,23 @@ public class FetchRequest extends AbstractRequest {
                 FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
-        return new FetchResponse<>(error, responseData, throttleTimeMs, metadata.sessionId());
+        return new FetchResponse<>(error, responseData, throttleTimeMs, data.sessionId());
     }
 
     public int replicaId() {
-        return replicaId;
+        return data.replicaId();
     }
 
     public int maxWait() {
-        return maxWait;
+        return data.maxWaitMs();
     }
 
     public int minBytes() {
-        return minBytes;
+        return data.minBytes();
     }
 
     public int maxBytes() {
-        return maxBytes;
+        return data.maxBytes();
     }
 
     public Map<TopicPartition, PartitionData> fetchData() {
@@ -492,11 +326,11 @@ public class FetchRequest extends AbstractRequest {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
@@ -504,62 +338,37 @@ public class FetchRequest extends AbstractRequest {
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
+    @Override
+    public ByteBuffer serialize(RequestHeader header) {
+        // Unlike the custom FetchResponse#toSend, we don't include the buffer size here. This buffer is passed
+        // to a NetworkSend which adds the length value in the eventual serialization
+
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        RequestHeaderData requestHeaderData = header.data();
+
+        int headerSize = requestHeaderData.size(cache, header.headerVersion());
+        int bodySize = data.size(cache, header.apiVersion());
+
+        ByteBuffer buffer = ByteBuffer.allocate(headerSize + bodySize);
+        ByteBufferAccessor writer = new ByteBufferAccessor(buffer);
+
+        requestHeaderData.write(writer, cache, header.headerVersion());
+        data.write(writer, cache, header.apiVersion());
+
+        buffer.rewind();
+        return buffer;
+    }
+
+    // For testing
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+        return new FetchRequest(new FetchRequestData(ApiKeys.FETCH.parseRequest(version, buffer), version), version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.FETCH.requestSchema(version()));
-        List<TopicAndPartitionData<PartitionData>> topicsData =
-            TopicAndPartitionData.batchByTopic(fetchData.entrySet().iterator());
-
-        struct.set(REPLICA_ID, replicaId);
-        struct.set(MAX_WAIT_TIME, maxWait);
-        struct.set(MIN_BYTES, minBytes);
-        struct.setIfExists(MAX_BYTES, maxBytes);
-        struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id());
-        struct.setIfExists(SESSION_ID, metadata.sessionId());
-        struct.setIfExists(SESSION_EPOCH, metadata.epoch());
-
-        List<Struct> topicArray = new ArrayList<>();
-        for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
-            Struct topicData = struct.instance(TOPICS);
-            topicData.set(TOPIC_NAME, topicEntry.topic);
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS);
-                partitionData.set(PARTITION_ID, partitionEntry.getKey());
-                partitionData.set(FETCH_OFFSET, fetchPartitionData.fetchOffset);
-                partitionData.set(PARTITION_MAX_BYTES, fetchPartitionData.maxBytes);
-                partitionData.setIfExists(LOG_START_OFFSET, fetchPartitionData.logStartOffset);
-                RequestUtils.setLeaderEpochIfExists(partitionData, CURRENT_LEADER_EPOCH, fetchPartitionData.currentLeaderEpoch);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS, topicArray.toArray());
-        if (struct.hasField(FORGOTTEN_TOPICS)) {
-            Map<String, List<Integer>> topicsToPartitions = new HashMap<>();
-            for (TopicPartition part : toForget) {
-                List<Integer> partitions = topicsToPartitions.computeIfAbsent(part.topic(), topic -> new ArrayList<>());
-                partitions.add(part.partition());
-            }
-            List<Struct> toForgetStructs = new ArrayList<>();
-            for (Map.Entry<String, List<Integer>> entry : topicsToPartitions.entrySet()) {
-                Struct toForgetStruct = struct.instance(FORGOTTEN_TOPICS);
-                toForgetStruct.set(TOPIC_NAME, entry.getKey());
-                toForgetStruct.set(FORGOTTEN_PARTITIONS, entry.getValue().toArray());
-                toForgetStructs.add(toForgetStruct);
-            }
-            struct.set(FORGOTTEN_TOPICS, toForgetStructs.toArray());
-        }
-        struct.setIfExists(RACK_ID, rackId);
-        return struct;
+        return data.toStruct(version());
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 984c50c..cb9a527 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -17,18 +17,19 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.ResponseHeaderData;
 import org.apache.kafka.common.network.ByteBufferSend;
 import org.apache.kafka.common.network.Send;
-import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.RecordsReadable;
+import org.apache.kafka.common.protocol.RecordsWritable;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.BaseRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MultiRecordsSend;
-import org.apache.kafka.common.record.RecordsSend;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -38,17 +39,9 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Queue;
-import java.util.function.Predicate;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.RECORDS;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import java.util.stream.Collectors;
+
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 
 /**
@@ -73,168 +66,18 @@ import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
  */
 public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
 
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level field names
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level fields
-    private static final Field.Int64 HIGH_WATERMARK = new Field.Int64("high_watermark",
-            "Last committed offset.");
-    private static final Field.Int64 LOG_START_OFFSET = new Field.Int64("log_start_offset",
-            "Earliest available offset.");
-    private static final Field.Int32 PREFERRED_READ_REPLICA = new Field.Int32("preferred_read_replica",
-            "The ID of the replica that the consumer should prefer.");
-
-    private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
-    private static final String ABORTED_TRANSACTIONS_KEY_NAME = "aborted_transactions";
-    private static final String RECORD_SET_KEY_NAME = "record_set";
-
-    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(
-            PARTITION_ID,
-            ERROR_CODE,
-            HIGH_WATERMARK);
-    private static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(
-            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V0),
-            new Field(RECORD_SET_KEY_NAME, RECORDS));
-
-    private static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(
-            TOPIC_NAME,
-            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
-
-    private static final Schema FETCH_RESPONSE_V0 = new Schema(
-            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
-    // V1 bumped for the addition of the throttle time
-    private static final Schema FETCH_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
-    // V2 bumped to indicate the client support message format V1 which uses relative offset and has timestamp.
-    private static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
-
-    // V3 bumped for addition of top-levl max_bytes field and to indicate that partition ordering is relevant
-    private static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
-
-    // V4 adds features for transactional consumption (the aborted transaction list and the
-    // last stable offset). It also exposes messages with magic v2 (along with older formats).
-    // aborted transaction field names
-    private static final Field.Int64 LAST_STABLE_OFFSET = new Field.Int64("last_stable_offset",
-            "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
-                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)");
-    private static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id",
-            "The producer id associated with the aborted transactions");
-    private static final Field.Int64 FIRST_OFFSET = new Field.Int64("first_offset",
-            "The first offset in the aborted transaction");
-
-    private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
-            PRODUCER_ID,
-            FIRST_OFFSET);
-
-    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
-            PARTITION_ID,
-            ERROR_CODE,
-            HIGH_WATERMARK,
-            LAST_STABLE_OFFSET,
-            new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
-
-    // V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
-    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema(
-            PARTITION_ID,
-            ERROR_CODE,
-            HIGH_WATERMARK,
-            LAST_STABLE_OFFSET,
-            LOG_START_OFFSET,
-            new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
-
-    // Introduced in V11 to support read from followers (KIP-392)
-    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new Schema(
-            PARTITION_ID,
-            ERROR_CODE,
-            HIGH_WATERMARK,
-            LAST_STABLE_OFFSET,
-            LOG_START_OFFSET,
-            new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)),
-            PREFERRED_READ_REPLICA);
-
-    private static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
-            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V4),
-            new Field(RECORD_SET_KEY_NAME, RECORDS));
-
-    private static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema(
-            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V5),
-            new Field(RECORD_SET_KEY_NAME, RECORDS));
-
-    private static final Schema FETCH_RESPONSE_PARTITION_V6 = new Schema(
-            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V6),
-            new Field(RECORD_SET_KEY_NAME, RECORDS));
-
-    private static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
-            TOPIC_NAME,
-            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
-
-    private static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema(
-            TOPIC_NAME,
-            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
-
-    private static final Schema FETCH_RESPONSE_TOPIC_V6 = new Schema(
-            TOPIC_NAME,
-            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V6)));
-
-    private static final Schema FETCH_RESPONSE_V4 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
-
-    private static final Schema FETCH_RESPONSE_V5 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
-
-    // V6 bumped up to indicate that the client supports KafkaStorageException. The KafkaStorageException will
-    // be translated to NotLeaderOrFollowerException in the response if version <= 5
-    private static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
-
-    // V7 added incremental fetch responses and a top-level error code.
-    private static final Field.Int32 SESSION_ID = new Field.Int32("session_id", "The fetch session ID");
-
-    private static final Schema FETCH_RESPONSE_V7 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE,
-            SESSION_ID,
-            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
-
-    // V8 bump used to indicate that on quota violation brokers send out responses before throttling.
-    private static final Schema FETCH_RESPONSE_V8 = FETCH_RESPONSE_V7;
-
-    // V9 adds the current leader epoch (see KIP-320)
-    private static final Schema FETCH_RESPONSE_V9 = FETCH_RESPONSE_V8;
-
-    // V10 bumped up to indicate ZStandard capability. (see KIP-110)
-    private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
-
-    // V11 added preferred read replica for each partition response to support read from followers (KIP-392)
-    private static final Schema FETCH_RESPONSE_V11 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE,
-            SESSION_ID,
-            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V6)));
-
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
-            FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
-            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, FETCH_RESPONSE_V10,
-            FETCH_RESPONSE_V11};
-    }
-
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
-    private final int throttleTimeMs;
-    private final Errors error;
-    private final int sessionId;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> responseData;
+    private final FetchResponseData data;
+    private final LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap;
+
+    public FetchResponseData data() {
+        return data;
+    }
+
 
     public static final class AbortedTransaction {
         public final long producerId;
@@ -268,16 +111,38 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
         public String toString() {
             return "(producerId=" + producerId + ", firstOffset=" + firstOffset + ")";
         }
+
+        static AbortedTransaction fromMessage(FetchResponseData.AbortedTransaction abortedTransaction) {
+            return new AbortedTransaction(abortedTransaction.producerId(), abortedTransaction.firstOffset());
+        }
     }
 
     public static final class PartitionData<T extends BaseRecords> {
-        public final Errors error;
-        public final long highWatermark;
-        public final long lastStableOffset;
-        public final long logStartOffset;
-        public final Optional<Integer> preferredReadReplica;
-        public final List<AbortedTransaction> abortedTransactions;
-        public final T records;
+        private final FetchResponseData.FetchablePartitionResponse partitionResponse;
+
+        // Derived fields
+        private final Optional<Integer> preferredReplica;
+        private final List<AbortedTransaction> abortedTransactions;
+        private final Errors error;
+
+        private PartitionData(FetchResponseData.FetchablePartitionResponse partitionResponse) {
+            // We partially construct FetchablePartitionResponse since we don't know the partition ID at this point
+            // When we convert the PartitionData (and other fields) into FetchResponseData down in toMessage, we
+            // set the partition IDs.
+            this.partitionResponse = partitionResponse;
+            this.preferredReplica = Optional.of(partitionResponse.partitionHeader().preferredReadReplica())
+                .filter(replicaId -> replicaId != INVALID_PREFERRED_REPLICA_ID);
+
+            if (partitionResponse.partitionHeader().abortedTransactions() == null) {
+                this.abortedTransactions = null;
+            } else {
+                this.abortedTransactions = partitionResponse.partitionHeader().abortedTransactions().stream()
+                    .map(AbortedTransaction::fromMessage)
+                    .collect(Collectors.toList());
+            }
+
+            this.error = Errors.forCode(partitionResponse.partitionHeader().errorCode());
+        }
 
         public PartitionData(Errors error,
                              long highWatermark,
@@ -286,13 +151,27 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                              Optional<Integer> preferredReadReplica,
                              List<AbortedTransaction> abortedTransactions,
                              T records) {
-            this.error = error;
-            this.highWatermark = highWatermark;
-            this.lastStableOffset = lastStableOffset;
-            this.logStartOffset = logStartOffset;
-            this.preferredReadReplica = preferredReadReplica;
+            this.preferredReplica = preferredReadReplica;
             this.abortedTransactions = abortedTransactions;
-            this.records = records;
+            this.error = error;
+            FetchResponseData.PartitionHeader partitionHeader = new FetchResponseData.PartitionHeader();
+            partitionHeader.setErrorCode(error.code())
+                .setHighWatermark(highWatermark)
+                .setLastStableOffset(lastStableOffset)
+                .setLogStartOffset(logStartOffset);
+            if (abortedTransactions != null) {
+                partitionHeader.setAbortedTransactions(abortedTransactions.stream().map(
+                    aborted -> new FetchResponseData.AbortedTransaction()
+                        .setProducerId(aborted.producerId)
+                        .setFirstOffset(aborted.firstOffset))
+                    .collect(Collectors.toList()));
+            } else {
+                partitionHeader.setAbortedTransactions(null);
+            }
+            partitionHeader.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
+            this.partitionResponse = new FetchResponseData.FetchablePartitionResponse()
+                .setPartitionHeader(partitionHeader)
+                .setRecordSet(records);
         }
 
         public PartitionData(Errors error,
@@ -301,13 +180,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                              long logStartOffset,
                              List<AbortedTransaction> abortedTransactions,
                              T records) {
-            this.error = error;
-            this.highWatermark = highWatermark;
-            this.lastStableOffset = lastStableOffset;
-            this.logStartOffset = logStartOffset;
-            this.preferredReadReplica = Optional.empty();
-            this.abortedTransactions = abortedTransactions;
-            this.records = records;
+            this(error, highWatermark, lastStableOffset, logStartOffset, Optional.empty(), abortedTransactions, records);
         }
 
         @Override
@@ -319,38 +192,53 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
 
             PartitionData that = (PartitionData) o;
 
-            return error == that.error &&
-                    highWatermark == that.highWatermark &&
-                    lastStableOffset == that.lastStableOffset &&
-                    logStartOffset == that.logStartOffset &&
-                    Objects.equals(preferredReadReplica, that.preferredReadReplica) &&
-                    Objects.equals(abortedTransactions, that.abortedTransactions) &&
-                    Objects.equals(records, that.records);
+            return this.partitionResponse.equals(that.partitionResponse);
         }
 
         @Override
         public int hashCode() {
-            int result = error != null ? error.hashCode() : 0;
-            result = 31 * result + Long.hashCode(highWatermark);
-            result = 31 * result + Long.hashCode(lastStableOffset);
-            result = 31 * result + Long.hashCode(logStartOffset);
-            result = 31 * result + Objects.hashCode(preferredReadReplica);
-            result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
-            result = 31 * result + (records != null ? records.hashCode() : 0);
-            return result;
+            return this.partitionResponse.hashCode();
         }
 
         @Override
         public String toString() {
-            return "(error=" + error +
-                    ", highWaterMark=" + highWatermark +
-                    ", lastStableOffset = " + lastStableOffset +
-                    ", logStartOffset = " + logStartOffset +
-                    ", preferredReadReplica = " + preferredReadReplica.map(Object::toString).orElse("absent") +
-                    ", abortedTransactions = " + abortedTransactions +
-                    ", recordsSizeInBytes=" + records.sizeInBytes() + ")";
+            return "(error=" + error() +
+                    ", highWaterMark=" + highWatermark() +
+                    ", lastStableOffset = " + lastStableOffset() +
+                    ", logStartOffset = " + logStartOffset() +
+                    ", preferredReadReplica = " + preferredReadReplica().map(Object::toString).orElse("absent") +
+                    ", abortedTransactions = " + abortedTransactions() +
+                    ", recordsSizeInBytes=" + records().sizeInBytes() + ")";
+        }
+
+        public Errors error() {
+            return error;
+        }
+
+        public long highWatermark() {
+            return partitionResponse.partitionHeader().highWatermark();
+        }
+
+        public long lastStableOffset() {
+            return partitionResponse.partitionHeader().lastStableOffset();
         }
 
+        public long logStartOffset() {
+            return partitionResponse.partitionHeader().logStartOffset();
+        }
+
+        public Optional<Integer> preferredReadReplica() {
+            return preferredReplica;
+        }
+
+        public List<AbortedTransaction> abortedTransactions() {
+            return abortedTransactions;
+        }
+
+        @SuppressWarnings("unchecked")
+        public T records() {
+            return (T) partitionResponse.recordSet();
+        }
     }
 
     /**
@@ -366,225 +254,128 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
-                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWritable writer = new RecordsWritable(dest, totalMessageSize - totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();
+
+        // Compute the total size of all the Sends and write it out along with the header in the first Send
+        ResponseHeaderData responseHeaderData = responseHeader.data();
+
+        int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion());
+        int bodySize = Math.toIntExact(sends.stream().mapToLong(Send::size).sum());
+
+        ByteBuffer buffer = ByteBuffer.allocate(headerSize + 4);
+        ByteBufferAccessor headerWriter = new ByteBufferAccessor(buffer);
+
+        // Write out the size and header
+        buffer.putInt(headerSize + bodySize);
+        responseHeaderData.write(headerWriter, cache, responseHeader.headerVersion());
+
+        // Rewind the buffer and set this the first Send in the MultiRecordsSend
         buffer.rewind();
+        sends.addFirst(new ByteBufferSend(dest, buffer));
 
-        Queue<Send> sends = new ArrayDeque<>();
-        sends.add(new ByteBufferSend(dest, buffer));
-        addResponseData(responseBodyStruct, throttleTimeMs, dest, sends);
         return new MultiRecordsSend(dest, sends);
     }
 
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
     public LinkedHashMap<TopicPartition, PartitionData<T>> responseData() {
-        return responseData;
+        return responseDataMap;
     }
 
     @Override
     public int throttleTimeMs() {
-        return this.throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
     public int sessionId() {
-        return sessionId;
+        return data.sessionId();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
-        responseData.values().forEach(response ->
-            updateErrorCounts(errorCounts, response.error)
+        responseDataMap.values().forEach(response ->
+            updateErrorCounts(errorCounts, response.error())
         );
         return errorCounts;
     }
 
     public static FetchResponse<MemoryRecords> parse(ByteBuffer buffer, short version) {
-        return parse(ApiKeys.FETCH.responseSchema(version).read(buffer));
-    }
-
-    private static void addResponseData(Struct struct, int throttleTimeMs, String dest, Queue<Send> sends) {
-        Object[] allTopicData = struct.getArray(RESPONSES_KEY_NAME);
-
-        if (struct.hasField(ERROR_CODE)) {
-            ByteBuffer buffer = ByteBuffer.allocate(14);
-            buffer.putInt(throttleTimeMs);
-            buffer.putShort(struct.get(ERROR_CODE));
-            buffer.putInt(struct.get(SESSION_ID));
-            buffer.putInt(allTopicData.length);
-            buffer.rewind();
-            sends.add(new ByteBufferSend(dest, buffer));
-        } else if (struct.hasField(THROTTLE_TIME_MS)) {
-            ByteBuffer buffer = ByteBuffer.allocate(8);
-            buffer.putInt(throttleTimeMs);
-            buffer.putInt(allTopicData.length);
-            buffer.rewind();
-            sends.add(new ByteBufferSend(dest, buffer));
-        } else {
-            ByteBuffer buffer = ByteBuffer.allocate(4);
-            buffer.putInt(allTopicData.length);
-            buffer.rewind();
-            sends.add(new ByteBufferSend(dest, buffer));
-        }
-
-        for (Object topicData : allTopicData)
-            addTopicData(dest, sends, (Struct) topicData);
+        FetchResponseData fetchResponseData = new FetchResponseData();
+        RecordsReadable reader = new RecordsReadable(buffer);
+        fetchResponseData.read(reader, version);
+        return new FetchResponse<>(fetchResponseData);
     }
 
-    private static void addTopicData(String dest, Queue<Send> sends, Struct topicData) {
-        String topic = topicData.get(TOPIC_NAME);
-        Object[] allPartitionData = topicData.getArray(PARTITIONS_KEY_NAME);
-
-        // include the topic header and the count for the number of partitions
-        ByteBuffer buffer = ByteBuffer.allocate(STRING.sizeOf(topic) + 4);
-        STRING.write(buffer, topic);
-        buffer.putInt(allPartitionData.length);
-        buffer.rewind();
-        sends.add(new ByteBufferSend(dest, buffer));
-
-        for (Object partitionData : allPartitionData)
-            addPartitionData(dest, sends, (Struct) partitionData);
+    @SuppressWarnings("unchecked")
+    private static <T extends BaseRecords> LinkedHashMap<TopicPartition, PartitionData<T>> toResponseDataMap(
+            FetchResponseData message) {
+        LinkedHashMap<TopicPartition, PartitionData<T>> responseMap = new LinkedHashMap<>();
+        message.responses().forEach(topicResponse -> {
+            topicResponse.partitionResponses().forEach(partitionResponse -> {
+                FetchResponseData.PartitionHeader partitionHeader = partitionResponse.partitionHeader();
+                TopicPartition tp = new TopicPartition(topicResponse.topic(), partitionHeader.partition());
+                PartitionData<T> partitionData = new PartitionData<>(partitionResponse);
+                responseMap.put(tp, partitionData);
+            });
+        });
+        return responseMap;
     }
 
-    private static void addPartitionData(String dest, Queue<Send> sends, Struct partitionData) {
-        Struct header = partitionData.getStruct(PARTITION_HEADER_KEY_NAME);
-        BaseRecords records = partitionData.getRecords(RECORD_SET_KEY_NAME);
+    private static <T extends BaseRecords> FetchResponseData toMessage(int throttleTimeMs, Errors error,
+                                                                       Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
+                                                                       int sessionId) {
+        FetchResponseData message = new FetchResponseData();
+        message.setThrottleTimeMs(throttleTimeMs);
+        message.setErrorCode(error.code());
+        message.setSessionId(sessionId);
 
-        // include the partition header and the size of the record set
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + 4);
-        header.writeTo(buffer);
-        buffer.putInt(records.sizeInBytes());
-        buffer.rewind();
-        sends.add(new ByteBufferSend(dest, buffer));
-
-        // finally the send for the record set itself
-        RecordsSend recordsSend = records.toSend(dest);
-        if (recordsSend.size() > 0)
-            sends.add(recordsSend);
-    }
-
-    private static <T extends BaseRecords> Struct toStruct(short version, int throttleTimeMs, Errors error,
-                                                       Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
-                                                       int sessionId) {
-        Struct struct = new Struct(ApiKeys.FETCH.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.setIfExists(ERROR_CODE, error.code());
-        struct.setIfExists(SESSION_ID, sessionId);
+        List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>();
         List<FetchRequest.TopicAndPartitionData<PartitionData<T>>> topicsData =
                 FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
-        List<Struct> topicArray = new ArrayList<>();
-        for (FetchRequest.TopicAndPartitionData<PartitionData<T>> topicEntry: topicsData) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_NAME, topicEntry.topic);
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData<T>> partitionEntry : topicEntry.partitions.entrySet()) {
-                PartitionData<T> fetchPartitionData = partitionEntry.getValue();
-                short errorCode = fetchPartitionData.error.code();
-                // If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
-                // for KafkaStorageException. In this case the client library will translate KafkaStorageException to
-                // UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry
-                // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if FetchRequest version <= 5
-                if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 5)
-                    errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME);
-                partitionDataHeader.set(PARTITION_ID, partitionEntry.getKey());
-                partitionDataHeader.set(ERROR_CODE, errorCode);
-                partitionDataHeader.set(HIGH_WATERMARK, fetchPartitionData.highWatermark);
-
-                if (partitionDataHeader.hasField(LAST_STABLE_OFFSET)) {
-                    partitionDataHeader.set(LAST_STABLE_OFFSET, fetchPartitionData.lastStableOffset);
-
-                    if (fetchPartitionData.abortedTransactions == null) {
-                        partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, null);
-                    } else {
-                        List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
-                        for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
-                            Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
-                            abortedTransactionStruct.set(PRODUCER_ID, abortedTransaction.producerId);
-                            abortedTransactionStruct.set(FIRST_OFFSET, abortedTransaction.firstOffset);
-                            abortedTransactionStructs.add(abortedTransactionStruct);
-                        }
-                        partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, abortedTransactionStructs.toArray());
-                    }
-                }
-                partitionDataHeader.setIfExists(LOG_START_OFFSET, fetchPartitionData.logStartOffset);
-                partitionDataHeader.setIfExists(PREFERRED_READ_REPLICA, fetchPartitionData.preferredReadReplica.orElse(-1));
-                partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
-                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        return struct;
+        topicsData.forEach(partitionDataTopicAndPartitionData -> {
+            List<FetchResponseData.FetchablePartitionResponse> partitionResponses = new ArrayList<>();
+            partitionDataTopicAndPartitionData.partitions.forEach((partitionId, partitionData) -> {
+                // Since PartitionData alone doesn't know the partition ID, we set it here
+                partitionData.partitionResponse.partitionHeader().setPartition(partitionId);
+                partitionResponses.add(partitionData.partitionResponse);
+            });
+            topicResponseList.add(new FetchResponseData.FetchableTopicResponse()
+                .setTopic(partitionDataTopicAndPartitionData.topic)
+                .setPartitionResponses(partitionResponses));
+        });
+
+        message.setResponses(topicResponseList);
+        return message;
     }
 
     /**
@@ -598,7 +389,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                                                      Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
         // use arbitrary values here without affecting the result.
-        return 4 + toStruct(version, 0, Errors.NONE, partIterator, INVALID_SESSION_ID).sizeOf();
+        FetchResponseData data = toMessage(0, Errors.NONE, partIterator, INVALID_SESSION_ID);
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        return 4 + data.size(cache, version);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 270069a..2e6b956 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -175,8 +175,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
         return Collections.unmodifiableList(data.liveLeaders());
     }
 
-    // Visible for testing
-    LeaderAndIsrRequestData data() {
+    public LeaderAndIsrRequestData data() {
         return data;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 0f79f2d..8ab8cc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -214,8 +214,7 @@ public class StopReplicaRequest extends AbstractControlRequest {
         return new StopReplicaRequest(ApiKeys.STOP_REPLICA.parseRequest(version, buffer), version);
     }
 
-    // Visible for testing
-    StopReplicaRequestData data() {
+    public StopReplicaRequestData data() {
         return data;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 49f962d..f17ea21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -209,8 +209,7 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
         return data.toStruct(version());
     }
 
-    // Visible for testing
-    UpdateMetadataRequestData data() {
+    public UpdateMetadataRequestData data() {
         return data;
     }
 
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index 727618c..364bd73 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -55,35 +55,35 @@
       "about": "The minimum bytes to accumulate in the response." },
     { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
       "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
-    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
+    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true,
       "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABO [...]
-    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
+    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": true,
       "about": "The fetch session ID." },
-    { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": false,
-      "about": "The fetch session epoch, which is used for ordering requests in a session" },
+    { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true,
+      "about": "The fetch session epoch, which is used for ordering requests in a session." },
     { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
       "about": "The topics to fetch.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The name of the topic to fetch." },
-      { "name": "FetchPartitions", "type": "[]FetchPartition", "versions": "0+",
+      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
         "about": "The partitions to fetch.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+        { "name": "Partition", "type": "int32", "versions": "0+",
           "about": "The partition index." },
         { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
           "about": "The current leader epoch of the partition." },
         { "name": "FetchOffset", "type": "int64", "versions": "0+",
           "about": "The message offset." },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": false,
+        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
           "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
-        { "name": "MaxBytes", "type": "int32", "versions": "0+",
+        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
           "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
       ]}
     ]},
-    { "name": "Forgotten", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,
+    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,
       "about": "In an incremental fetch request, the partitions to remove.", "fields": [
-      { "name": "Name", "type": "string", "versions": "7+", "entityType": "topicName",
+      { "name": "Topic", "type": "string", "versions": "7+", "entityType": "topicName",
         "about": "The partition name." },
-      { "name": "ForgottenPartitionIndexes", "type": "[]int32", "versions": "7+",
+      { "name": "Partitions", "type": "[]int32", "versions": "7+",
         "about": "The partitions indexes to forget." }
     ]},
     { "name": "RackId", "type":  "string", "versions": "11+", "default": "", "ignorable": true,
diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json
index 0fe98ad..9f64c24 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -43,37 +43,39 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
+    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
+      { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
         "about": "The topic partitions.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partiiton index." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The error code, or 0 if there was no fetch error." },
-        { "name": "HighWatermark", "type": "int64", "versions": "0+",
-          "about": "The current high water mark." },
-        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
-          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
-          "about": "The current log start offset." },
-        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
-          "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
-            "about": "The producer id associated with the aborted transaction." },
-          { "name": "FirstOffset", "type": "int64", "versions": "4+",
-            "about": "The first offset in the aborted transaction." }
+        { "name":  "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
+          "fields":  [
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition index." },
+          { "name": "ErrorCode", "type": "int16", "versions": "0+",
+            "about": "The error code, or 0 if there was no fetch error." },
+          { "name": "HighWatermark", "type": "int64", "versions": "0+",
+            "about": "The current high water mark." },
+          { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
+            "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
+          { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
+            "about": "The current log start offset." },
+          { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
+            "about": "The aborted transactions.",  "fields": [
+            { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
+              "about": "The producer id associated with the aborted transaction." },
+            { "name": "FirstOffset", "type": "int64", "versions": "4+",
+              "about": "The first offset in the aborted transaction." }
+          ]},
+          { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false,
+            "about": "The preferred read replica for the consumer to use on its next fetch request"}
         ]},
-        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
-          "about": "The preferred read replica for the consumer to use on its next fetch request"},
-        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
-          "about": "The record data." }
+        { "name": "RecordSet", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
       ]}
     ]}
   ]
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index bbff0ed..a3946bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -951,8 +951,8 @@ public final class MessageTest {
         verifyWriteSucceeds((short) 0,
             new OffsetCommitRequestData().setRetentionTimeMs(123));
         verifyWriteRaisesUve((short) 5, "forgotten",
-            new FetchRequestData().setForgotten(singletonList(
-                new FetchRequestData.ForgottenTopic().setName("foo"))));
+            new FetchRequestData().setForgottenTopicsData(singletonList(
+                new FetchRequestData.ForgottenTopic().setTopic("foo"))));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/RecordsWritableTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/RecordsWritableTest.java
new file mode 100644
index 0000000..937da4d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/RecordsWritableTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class RecordsWritableTest {
+    @Test
+    public void testBufferSlice() {
+        Queue<Send> sends = new ArrayDeque<>();
+        RecordsWritable writer = new RecordsWritable("dest", 10000 /* enough for tests */, sends::add);
+        for (int i = 0; i < 4; i++) {
+            writer.writeInt(i);
+        }
+        writer.flush();
+        Assert.assertEquals(sends.size(), 1);
+        ByteBufferSend send = (ByteBufferSend) sends.remove();
+        Assert.assertEquals(send.size(), 16);
+        Assert.assertEquals(send.remaining(), 16);
+
+        // No new data, flush shouldn't do anything
+        writer.flush();
+        Assert.assertEquals(sends.size(), 0);
+
+        // Cause the buffer to expand a few times
+        for (int i = 0; i < 100; i++) {
+            writer.writeInt(i);
+        }
+        writer.flush();
+        Assert.assertEquals(sends.size(), 1);
+        send = (ByteBufferSend) sends.remove();
+        Assert.assertEquals(send.size(), 400);
+        Assert.assertEquals(send.remaining(), 400);
+
+        writer.writeByte((byte) 5);
+        writer.flush();
+        Assert.assertEquals(sends.size(), 1);
+        send = (ByteBufferSend) sends.remove();
+        Assert.assertEquals(send.size(), 1);
+        Assert.assertEquals(send.remaining(), 1);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 750fb26..4af683d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -93,6 +93,7 @@ import org.apache.kafka.common.message.EndTxnRequestData;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
 import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -144,7 +145,9 @@ import org.apache.kafka.common.message.UpdateMetadataResponseData;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -216,7 +219,7 @@ public class RequestResponseTest {
         checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true);
         checkErrorResponse(createControlledShutdownRequest(0), unknownServerException, true);
         checkRequest(createFetchRequest(4), true);
-        checkResponse(createFetchResponse(), 4, true);
+        checkResponse(createFetchResponse(true), 4, true);
         List<TopicPartition> toForgetTopics = new ArrayList<>();
         toForgetTopics.add(new TopicPartition("foo", 0));
         toForgetTopics.add(new TopicPartition("foo", 2));
@@ -224,7 +227,7 @@ public class RequestResponseTest {
         checkRequest(createFetchRequest(7, new FetchMetadata(123, 456), toForgetTopics), true);
         checkResponse(createFetchResponse(123), 7, true);
         checkResponse(createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123), 7, true);
-        checkErrorResponse(createFetchRequest(4), unknownServerException, true);
+        checkErrorResponse(createFetchRequest(7), unknownServerException, true);
         checkRequest(createHeartBeatRequest(), true);
         checkErrorResponse(createHeartBeatRequest(), unknownServerException, true);
         checkResponse(createHeartBeatResponse(), 0, true);
@@ -501,9 +504,11 @@ public class RequestResponseTest {
     private void checkOlderFetchVersions() throws Exception {
         int latestVersion = FETCH.latestVersion();
         for (int i = 0; i < latestVersion; ++i) {
-            checkErrorResponse(createFetchRequest(i), unknownServerException, true);
+            if (i > 7) {
+                checkErrorResponse(createFetchRequest(i), unknownServerException, true);
+            }
             checkRequest(createFetchRequest(i), true);
-            checkResponse(createFetchResponse(), i, true);
+            checkResponse(createFetchResponse(i >= 4), i, true);
         }
     }
 
@@ -668,7 +673,7 @@ public class RequestResponseTest {
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(
                 Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                0L, Optional.empty(), null, records));
+                0L, Optional.empty(), Collections.emptyList(), records));
 
         FetchResponse<MemoryRecords> v0Response = new FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
         FetchResponse<MemoryRecords> v1Response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
@@ -709,7 +714,7 @@ public class RequestResponseTest {
         verifyFetchResponseFullWrite(FETCH.latestVersion(),
             createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123));
         for (short version = 0; version <= FETCH.latestVersion(); version++) {
-            verifyFetchResponseFullWrite(version, createFetchResponse());
+            verifyFetchResponseFullWrite(version, createFetchResponse(version >= 4));
         }
     }
 
@@ -781,7 +786,7 @@ public class RequestResponseTest {
     public void testFetchRequestMaxBytesOldVersions() throws Exception {
         final short version = 1;
         FetchRequest fr = createFetchRequest(version);
-        FetchRequest fr2 = new FetchRequest(fr.toStruct(), version);
+        FetchRequest fr2 = new FetchRequest(new FetchRequestData(fr.toStruct(), version), version);
         assertEquals(fr2.maxBytes(), fr.maxBytes());
     }
 
@@ -812,6 +817,24 @@ public class RequestResponseTest {
     }
 
     @Test
+    public void testFetchRequestCompat() {
+        Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<>();
+        fetchData.put(new TopicPartition("test", 0), new FetchRequest.PartitionData(100, 2, 100, Optional.of(42)));
+        FetchRequest req = FetchRequest.Builder
+                .forConsumer(100, 100, fetchData)
+                .metadata(new FetchMetadata(10, 20))
+                .isolationLevel(IsolationLevel.READ_COMMITTED)
+                .build((short) 2);
+
+        FetchRequestData data = req.data();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int size = data.size(cache, (short) 2);
+
+        ByteBufferAccessor writer = new ByteBufferAccessor(ByteBuffer.allocate(size));
+        data.write(writer, cache, (short) 2);
+    }
+
+    @Test
     public void testJoinGroupRequestVersion0RebalanceTimeout() {
         final short version = 0;
         JoinGroupRequest jgr = createJoinGroupRequest(version);
@@ -960,30 +983,30 @@ public class RequestResponseTest {
 
     private FetchRequest createFetchRequest(int version, FetchMetadata metadata, List<TopicPartition> toForget) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
-        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L,
-                1000000, Optional.of(15)));
-        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L,
-                1000000, Optional.of(25)));
+        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, -1L,
+                1000000, Optional.empty()));
+        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, -1L,
+                1000000, Optional.empty()));
         return FetchRequest.Builder.forConsumer(100, 100000, fetchData).
             metadata(metadata).setMaxBytes(1000).toForget(toForget).build((short) version);
     }
 
     private FetchRequest createFetchRequest(int version, IsolationLevel isolationLevel) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
-        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L,
-                1000000, Optional.of(15)));
-        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L,
-                1000000, Optional.of(25)));
+        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, -1L,
+                1000000, Optional.empty()));
+        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, -1L,
+                1000000, Optional.empty()));
         return FetchRequest.Builder.forConsumer(100, 100000, fetchData).
             isolationLevel(isolationLevel).setMaxBytes(1000).build((short) version);
     }
 
     private FetchRequest createFetchRequest(int version) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
-        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L,
-                1000000, Optional.of(15)));
-        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L,
-                1000000, Optional.of(25)));
+        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, -1L,
+                1000000, Optional.empty()));
+        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, -1L,
+                1000000, Optional.empty()));
         return FetchRequest.Builder.forConsumer(100, 100000, fetchData).setMaxBytes(1000).build((short) version);
     }
 
@@ -995,7 +1018,7 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), Collections.emptyList(), records));
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
             new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
@@ -1003,14 +1026,18 @@ public class RequestResponseTest {
         return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
     }
 
-    private FetchResponse<MemoryRecords> createFetchResponse() {
+    private FetchResponse<MemoryRecords> createFetchResponse(boolean includeAborted) {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
+
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), Collections.emptyList(), records));
 
-        List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
-                new FetchResponse.AbortedTransaction(234L, 999L));
+        List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.emptyList();
+        if (includeAborted) {
+            abortedTransactions = Collections.singletonList(
+                    new FetchResponse.AbortedTransaction(234L, 999L));
+        }
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
                 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 65e4a87..f291edd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -674,6 +674,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
+    def maybeDownConvertStorageError(error: Errors, version: Short): Errors = {
+      // If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
+      // for KafkaStorageException. In this case the client library will translate KafkaStorageException to
+      // UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry
+      // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if FetchRequest version <= 5
+      if (error == Errors.KAFKA_STORAGE_ERROR && versionId <= 5) {
+        Errors.NOT_LEADER_OR_FOLLOWER
+      } else {
+        error
+      }
+    }
+
     def maybeConvertFetchedData(tp: TopicPartition,
                                 partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = {
       val logConfig = replicaManager.getLogConfig(tp)
@@ -713,7 +725,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
                 // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
                 // client.
-                new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
+                val error = maybeDownConvertStorageError(partitionData.error, versionId)
+                new FetchResponse.PartitionData[BaseRecords](error, partitionData.highWatermark,
                   partitionData.lastStableOffset, partitionData.logStartOffset,
                   partitionData.preferredReadReplica, partitionData.abortedTransactions,
                   new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
@@ -723,10 +736,13 @@ class KafkaApis(val requestChannel: RequestChannel,
                   errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
               }
             }
-          case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
-            partitionData.lastStableOffset, partitionData.logStartOffset,
-            partitionData.preferredReadReplica, partitionData.abortedTransactions,
-            unconvertedRecords)
+          case None => {
+            val error = maybeDownConvertStorageError(partitionData.error, versionId)
+            new FetchResponse.PartitionData[BaseRecords](error, partitionData.highWatermark,
+              partitionData.lastStableOffset, partitionData.logStartOffset,
+              partitionData.preferredReadReplica, partitionData.abortedTransactions,
+              unconvertedRecords)
+          }
         }
       }
     }
@@ -740,7 +756,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
         if (data.isReassignmentFetch)
           reassigningPartitions.add(tp)
-        partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
+        val error = maybeDownConvertStorageError(data.error, versionId)
+        partitions.put(tp, new FetchResponse.PartitionData(error, data.highWatermark, lastStableOffset,
           data.logStartOffset, data.preferredReadReplica.map(int2Integer).asJava,
           abortedTransactions, data.records))
       }
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 6561a48..33a63f0 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -569,7 +569,8 @@ class FetchRequestTest extends BaseRequestTest {
     // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
     val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
       createPartitionMap(300, Seq(topicPartition), Map.empty))
-      .setMaxBytes(800).build()
+      .setMaxBytes(800)
+      .build()
 
     val res0 = sendFetchRequest(leaderId, req0)
     val data0 = res0.responseData.get(topicPartition)
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
index d2afc17..c0b280b 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
@@ -56,7 +56,7 @@ public final class FieldSpec {
 
     private final Optional<Integer> tag;
 
-    private boolean zeroCopy;
+    private final boolean zeroCopy;
 
     @JsonCreator
     public FieldSpec(@JsonProperty("name") String name,
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java b/generator/src/main/java/org/apache/kafka/message/FieldType.java
index 5e0a4c7..aa4523e 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldType.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java
@@ -182,6 +182,31 @@ public interface FieldType {
         }
     }
 
+    final class RecordsFieldType implements FieldType {
+        static final RecordsFieldType INSTANCE = new RecordsFieldType();
+        private static final String NAME = "records";
+
+        @Override
+        public boolean serializationIsDifferentInFlexibleVersions() {
+            return true;
+        }
+
+        @Override
+        public boolean isRecords() {
+            return true;
+        }
+
+        @Override
+        public boolean canBeNullable() {
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return NAME;
+        }
+    }
+
     final class StructType implements FieldType {
         private final String type;
 
@@ -267,6 +292,8 @@ public interface FieldType {
                 return StringFieldType.INSTANCE;
             case BytesFieldType.NAME:
                 return BytesFieldType.INSTANCE;
+            case RecordsFieldType.NAME:
+                return RecordsFieldType.INSTANCE;
             default:
                 if (string.startsWith(ARRAY_PREFIX)) {
                     String elementTypeString = string.substring(ARRAY_PREFIX.length());
@@ -324,6 +351,13 @@ public interface FieldType {
     }
 
     /**
+     * Returns true if this is a records type
+     */
+    default boolean isRecords() {
+        return false;
+    }
+
+    /**
      * Returns true if this is a floating point type.
      */
     default boolean isFloat() {
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index c51dcc9..137e86b 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -390,6 +390,9 @@ public final class MessageDataGenerator {
             } else {
                 return "byte[]";
             }
+        } else if (field.type() instanceof FieldType.RecordsFieldType) {
+            headerGenerator.addImport(MessageGenerator.BASE_RECORDS_CLASS);
+            return "BaseRecords";
         } else if (field.type().isStruct()) {
             return MessageGenerator.capitalizeFirst(field.typeString());
         } else if (field.type().isArray()) {
@@ -643,7 +646,7 @@ public final class MessageDataGenerator {
             ifNotMember(__ -> {
                 if (type.isString()) {
                     buffer.printf("%s = _readable.readShort();%n", lengthVar);
-                } else if (type.isBytes() || type.isArray()) {
+                } else if (type.isBytes() || type.isArray() || type.isRecords()) {
                     buffer.printf("%s = _readable.readInt();%n", lengthVar);
                 } else {
                     throw new RuntimeException("Can't handle variable length type " + type);
@@ -682,6 +685,17 @@ public final class MessageDataGenerator {
                 buffer.printf("_readable.readArray(newBytes);%n");
                 buffer.printf("%snewBytes%s", assignmentPrefix, assignmentSuffix);
             }
+        } else if (type.isRecords()) {
+            headerGenerator.addImport(MessageGenerator.RECORDS_READABLE_CLASS);
+            buffer.printf("if (_readable instanceof RecordsReadable) {%n");
+            buffer.incrementIndent();
+            buffer.printf("%s((RecordsReadable) _readable).readRecords(%s)%s", assignmentPrefix, lengthVar, assignmentSuffix);
+            buffer.decrementIndent();
+            buffer.printf("} else {%n");
+            buffer.incrementIndent();
+            buffer.printf("throw new RuntimeException(\"Cannot read records from reader of class: \" + _readable.getClass().getSimpleName());%n");
+            buffer.decrementIndent();
+            buffer.printf("}%n");
         } else if (type.isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
             if (isStructArrayWithKeys) {
@@ -964,6 +978,12 @@ public final class MessageDataGenerator {
                     String.format("MessageUtil.jsonNodeToBinary(%s, \"%s\")",
                         target.sourceVariable(), target.humanReadableName())));
             }
+        } else if (target.field().type().isRecords()) {
+            headerGenerator.addImport(MessageGenerator.BYTE_BUFFER_CLASS);
+            headerGenerator.addImport(MessageGenerator.MEMORY_RECORDS_CLASS);
+            buffer.printf("%s;%n", target.assignmentStatement(
+                    String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))",
+                            target.sourceVariable(), target.humanReadableName())));
         } else if (target.field().type().isArray()) {
             buffer.printf("if (!%s.isArray()) {%n", target.sourceVariable());
             buffer.incrementIndent();
@@ -1109,6 +1129,9 @@ public final class MessageDataGenerator {
                     String.format("new BinaryNode(Arrays.copyOf(%s, %s.length))",
                         target.sourceVariable(), target.sourceVariable())));
             }
+        } else if (target.field().type().isRecords()) {
+            headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
+            buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})"));
         } else if (target.field().type().isArray()) {
             headerGenerator.addImport(MessageGenerator.ARRAY_NODE_CLASS);
             headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
@@ -1209,6 +1232,8 @@ public final class MessageDataGenerator {
             } else {
                 return String.format("struct.getByteArray(\"%s\")", name);
             }
+        } else if (type.isRecords()) {
+            return String.format("struct.getRecords(\"%s\")", name);
         } else if (type.isStruct()) {
             return String.format("new %s((Struct) struct.get(\"%s\"), _version)",
                     type.toString(), name);
@@ -1467,6 +1492,8 @@ public final class MessageDataGenerator {
                     } else {
                         lengthExpression = String.format("%s.length", name);
                     }
+                } else if (type.isRecords()) {
+                    lengthExpression = String.format("%s.sizeInBytes()", name);
                 } else if (type.isArray()) {
                     lengthExpression = String.format("%s.size()", name);
                 } else {
@@ -1499,6 +1526,17 @@ public final class MessageDataGenerator {
                     } else {
                         buffer.printf("_writable.writeByteArray(%s);%n", name);
                     }
+                } else if (type.isRecords()) {
+                    headerGenerator.addImport(MessageGenerator.RECORDS_WRITABLE_CLASS);
+                    buffer.printf("if (_writable instanceof RecordsWritable) {%n");
+                    buffer.incrementIndent();
+                    buffer.printf("((RecordsWritable) _writable).writeRecords(%s);%n", name);
+                    buffer.decrementIndent();
+                    buffer.printf("} else {%n");
+                    buffer.incrementIndent();
+                    buffer.printf("throw new RuntimeException(\"Cannot write records to writer of class: \" + _writable.getClass().getSimpleName());%n");
+                    buffer.decrementIndent();
+                    buffer.printf("}%n");
                 } else if (type.isArray()) {
                     FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
                     FieldType elementType = arrayType.elementType();
@@ -1658,6 +1696,9 @@ public final class MessageDataGenerator {
                 buffer.printf("struct.setByteArray(\"%s\", this.%s);%n",
                     field.snakeCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            buffer.printf("struct.set(\"%s\", this.%s);%n",
+                    field.snakeCaseName(), field.camelCaseName());
         } else if (field.type().isArray()) {
             IsNullConditional.forField(field).
                 possibleVersions(versions).
@@ -1997,6 +2038,8 @@ public final class MessageDataGenerator {
                     } else {
                         buffer.printf("_size += _bytesSize;%n");
                     }
+                } else if (field.type().isRecords()) {
+                    buffer.printf("_size += %s.sizeInBytes() + 4;%n", field.camelCaseName());
                 } else if (field.type().isStruct()) {
                     buffer.printf("int size = this.%s.size(_cache, _version);%n", field.camelCaseName());
                     if (tagged) {
@@ -2078,6 +2121,10 @@ public final class MessageDataGenerator {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            headerGenerator.addImport(MessageGenerator.OBJECTS_CLASS);
+            buffer.printf("if (!Objects.equals(this.%s, other.%s)) return false;%n",
+                    field.camelCaseName(), field.camelCaseName());
         } else {
             buffer.printf("if (%s != other.%s) return false;%n",
                 field.camelCaseName(), field.camelCaseName());
@@ -2121,12 +2168,16 @@ public final class MessageDataGenerator {
             if (field.zeroCopy()) {
                 headerGenerator.addImport(MessageGenerator.OBJECTS_CLASS);
                 buffer.printf("hashCode = 31 * hashCode + Objects.hashCode(%s);%n",
-                    field.camelCaseName());
+                        field.camelCaseName());
             } else {
                 headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS);
                 buffer.printf("hashCode = 31 * hashCode + Arrays.hashCode(%s);%n",
                     field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            headerGenerator.addImport(MessageGenerator.OBJECTS_CLASS);
+            buffer.printf("hashCode = 31 * hashCode + Objects.hashCode(%s);%n",
+                    field.camelCaseName());
         } else if (field.type().isStruct()
                    || field.type().isArray()
                    || field.type().isString()) {
@@ -2180,6 +2231,13 @@ public final class MessageDataGenerator {
                                 target.sourceVariable())));
                     });
                 }
+            } else if (field.type().isRecords()) {
+                cond.ifShouldNotBeNull(() -> {
+                    headerGenerator.addImport(MessageGenerator.MEMORY_RECORDS_CLASS);
+                    buffer.printf("%s;%n", target.assignmentStatement(
+                        String.format("MemoryRecords.readableRecords(((MemoryRecords) %s).buffer().duplicate())",
+                            target.sourceVariable())));
+                });
             } else if (field.type().isStruct()) {
                 cond.ifShouldNotBeNull(() ->
                     buffer.printf("%s;%n", target.assignmentStatement(
@@ -2254,6 +2312,9 @@ public final class MessageDataGenerator {
                 buffer.printf("+ \"%s%s=\" + Arrays.toString(%s)%n",
                     prefix, field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            buffer.printf("+ \"%s%s=\" + %s%n",
+                    prefix, field.camelCaseName(), field.camelCaseName());
         } else if (field.type().isStruct() ||
             field.type() instanceof FieldType.UUIDFieldType) {
         } else if (field.type().isStruct()) {
@@ -2397,6 +2458,8 @@ public final class MessageDataGenerator {
                 headerGenerator.addImport(MessageGenerator.BYTES_CLASS);
                 return "Bytes.EMPTY";
             }
+        } else if (field.type().isRecords()) {
+            return "null";
         } else if (field.type().isStruct()) {
             if (!field.defaultString().isEmpty()) {
                 throw new RuntimeException("Invalid default for struct field " +
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index c84b60b..74d85b0 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -51,8 +51,12 @@ public final class MessageGenerator {
 
     static final String READABLE_CLASS = "org.apache.kafka.common.protocol.Readable";
 
+    static final String RECORDS_READABLE_CLASS = "org.apache.kafka.common.protocol.RecordsReadable";
+
     static final String WRITABLE_CLASS = "org.apache.kafka.common.protocol.Writable";
 
+    static final String RECORDS_WRITABLE_CLASS = "org.apache.kafka.common.protocol.RecordsWritable";
+
     static final String ARRAYS_CLASS = "java.util.Arrays";
 
     static final String OBJECTS_CLASS = "java.util.Objects";
@@ -88,6 +92,10 @@ public final class MessageGenerator {
 
     static final String UUID_CLASS = "java.util.UUID";
 
+    static final String BASE_RECORDS_CLASS = "org.apache.kafka.common.record.BaseRecords";
+
+    static final String MEMORY_RECORDS_CLASS = "org.apache.kafka.common.record.MemoryRecords";
+
     static final String REQUEST_SUFFIX = "Request";
 
     static final String RESPONSE_SUFFIX = "Response";
diff --git a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
index d433b0b..40075cf 100644
--- a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
@@ -280,6 +280,8 @@ final class SchemaGenerator {
             } else {
                 return nullable ? "Type.NULLABLE_BYTES" : "Type.BYTES";
             }
+        } else if (type.isRecords()) {
+            return "Type.RECORDS";
         } else if (type.isArray()) {
             if (fieldFlexibleVersions.contains(version)) {
                 headerGenerator.addImport(MessageGenerator.COMPACT_ARRAYOF_CLASS);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
new file mode 100644
index 0000000..5f68a89
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+
+    @Param({"10", "500", "1000"})
+    private int topicCount;
+
+    @Param({"3", "10", "20"})
+    private int partitionCount;
+
+    Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+    RequestHeader header;
+
+    FetchRequest consumerRequest;
+
+    FetchRequest replicaRequest;
+
+    Struct requestStruct;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.fetchData = new HashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            String topic = UUID.randomUUID().toString();
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+                    0, 0, 4096, Optional.empty());
+                fetchData.put(new TopicPartition(topic, partitionId), partitionData);
+            }
+        }
+
+        this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+        this.consumerRequest = FetchRequest.Builder.forConsumer(0, 0, fetchData)
+            .build(ApiKeys.FETCH.latestVersion());
+        this.replicaRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion(), 1, 0, 0, fetchData)
+            .build(ApiKeys.FETCH.latestVersion());
+        this.requestStruct = this.consumerRequest.data().toStruct(ApiKeys.FETCH.latestVersion());
+
+    }
+
+    @Benchmark
+    public short testFetchRequestFromStruct() {
+        AbstractRequest request = AbstractRequest.parseRequest(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), requestStruct);
+        return request.version();
+    }
+
+    @Benchmark
+    public int testFetchRequestForConsumer() {
+        FetchRequest fetchRequest = FetchRequest.Builder.forConsumer(0, 0, fetchData)
+            .build(ApiKeys.FETCH.latestVersion());
+        return fetchRequest.fetchData().size();
+    }
+
+    @Benchmark
+    public int testFetchRequestForReplica() {
+        FetchRequest fetchRequest = FetchRequest.Builder.forReplica(
+            ApiKeys.FETCH.latestVersion(), 1, 0, 0, fetchData)
+                .build(ApiKeys.FETCH.latestVersion());
+        return fetchRequest.fetchData().size();
+    }
+
+    @Benchmark
+    public int testSerializeFetchRequestForConsumer() throws IOException {
+        Send send = consumerRequest.toSend("dest", header);
+        ByteBufferChannel channel = new ByteBufferChannel(send.size());
+        send.writeTo(channel);
+        return channel.buffer().limit();
+    }
+
+    @Benchmark
+    public int testSerializeFetchRequestForReplica() throws IOException {
+        Send send = replicaRequest.toSend("dest", header);
+        ByteBufferChannel channel = new ByteBufferChannel(send.size());
+        send.writeTo(channel);
+        return channel.buffer().limit();
+    }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
new file mode 100644
index 0000000..c742d67
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchResponseBenchmark {
+    @Param({"10", "500", "1000"})
+    private int topicCount;
+
+    @Param({"3", "10", "20"})
+    private int partitionCount;
+
+    LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData;
+
+    ResponseHeader header;
+
+    FetchResponse<MemoryRecords> fetchResponse;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
+                new SimpleRecord(1000, "key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8)),
+                new SimpleRecord(1001, "key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8)),
+                new SimpleRecord(1002, "key3".getBytes(StandardCharsets.UTF_8), "value3".getBytes(StandardCharsets.UTF_8)));
+
+        this.responseData = new LinkedHashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            String topic = UUID.randomUUID().toString();
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchResponse.PartitionData<MemoryRecords> partitionData = new FetchResponse.PartitionData<>(
+                    Errors.NONE, 0, 0, 0, Optional.empty(), Collections.emptyList(), records);
+                responseData.put(new TopicPartition(topic, partitionId), partitionData);
+            }
+        }
+
+        this.header = new ResponseHeader(100, ApiKeys.FETCH.responseHeaderVersion(ApiKeys.FETCH.latestVersion()));
+        this.fetchResponse = new FetchResponse<>(Errors.NONE, responseData, 0, 0);
+    }
+
+    @Benchmark
+    public int testConstructFetchResponse() {
+        FetchResponse<MemoryRecords> fetchResponse = new FetchResponse<>(Errors.NONE, responseData, 0, 0);
+        return fetchResponse.responseData().size();
+    }
+
+    @Benchmark
+    public int testSerializeFetchResponse() throws IOException {
+        Send send = fetchResponse.toSend("dest", header, ApiKeys.FETCH.latestVersion());
+        ByteBufferChannel channel = new ByteBufferChannel(send.size());
+        send.writeTo(channel);
+        return channel.buffer().limit();
+    }
+}