You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2022/09/02 08:52:30 UTC

[kafka] 01/06: MINOR: Add more validation during KRPC deserialization

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 14951a83e3fdead212156e5532359500d72f68bc
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri May 20 15:23:12 2022 -0700

    MINOR: Add more validation during KRPC deserialization
    
    When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some
    other things), check that we have at least N bytes remaining before allocating an array of size N.
    
    Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were
    remaining. Instead, when reading an individual record in the Raft layer, simply create a
    ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in.
    
    Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in
    RequestResponseTest.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>, Colin McCabe <co...@cmccabe.xyz>
    
    Co-authored-by: Colin McCabe <co...@cmccabe.xyz>
    Co-authored-by: Manikumar Reddy <ma...@gmail.com>
    Co-authored-by: Mickael Maison <mi...@gmail.com>
---
 checkstyle/suppressions.xml                        |  4 +
 .../kafka/common/protocol/ByteBufferAccessor.java  | 14 +++-
 .../common/protocol/DataInputStreamReadable.java   | 18 ++++-
 .../org/apache/kafka/common/protocol/Readable.java |  9 +--
 .../apache/kafka/common/record/DefaultRecord.java  |  2 +
 .../common/message/SimpleArraysMessageTest.java    | 54 +++++++++++++
 .../common/protocol/ByteBufferAccessorTest.java    | 58 ++++++++++++++
 .../kafka/common/record/DefaultRecordTest.java     | 14 ++++
 .../kafka/common/requests/RequestContextTest.java  | 83 +++++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java | 93 ++++++++++++++++++++++
 .../common/message/SimpleArraysMessage.json        | 29 +++++++
 .../main/scala/kafka/tools/TestRaftServer.scala    |  6 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |  6 +-
 .../apache/kafka/message/MessageDataGenerator.java |  9 ++-
 .../apache/kafka/raft/internals/StringSerde.java   |  3 +-
 15 files changed, 378 insertions(+), 24 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 8163f78f85..8f91d98738 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -153,6 +153,10 @@
     <suppress checks="JavaNCSS"
               files="DistributedHerderTest.java"/>
 
+    <!-- Raft -->
+    <suppress checks="NPathComplexity"
+              files="RecordsIterator.java"/>
+
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
               files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
index 3c5c309731..712973e369 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
@@ -54,8 +54,15 @@ public class ByteBufferAccessor implements Readable, Writable {
     }
 
     @Override
-    public void readArray(byte[] arr) {
+    public byte[] readArray(int size) {
+        int remaining = buf.remaining();
+        if (size > remaining) {
+            throw new RuntimeException("Error reading byte array of " + size + " byte(s): only " + remaining +
+             " byte(s) available");
+        }
+        byte[] arr = new byte[size];
         buf.get(arr);
+        return arr;
     }
 
     @Override
@@ -133,6 +140,11 @@ public class ByteBufferAccessor implements Readable, Writable {
         return ByteUtils.readVarlong(buf);
     }
 
+    @Override
+    public int remaining() {
+        return buf.remaining();
+    }
+
     public void flip() {
         buf.flip();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
index 93c6c597d7..3f0b96757e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
@@ -76,10 +76,12 @@ public class DataInputStreamReadable implements Readable, Closeable {
     }
 
     @Override
-    public void readArray(byte[] arr) {
+    public byte[] readArray(final int length) {
         try {
+            byte[] arr = new byte[length];
             input.readFully(arr);
-        } catch (IOException e) {
+            return arr;
+        }  catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
@@ -95,8 +97,7 @@ public class DataInputStreamReadable implements Readable, Closeable {
 
     @Override
     public ByteBuffer readByteBuffer(int length) {
-        byte[] arr = new byte[length];
-        readArray(arr);
+        byte[] arr = readArray(length);
         return ByteBuffer.wrap(arr);
     }
 
@@ -118,6 +119,15 @@ public class DataInputStreamReadable implements Readable, Closeable {
         }
     }
 
+    @Override
+    public int remaining() {
+        try {
+            return input.available();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public void close() {
         try {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
index 46879cde53..f453d12e17 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
@@ -32,15 +32,15 @@ public interface Readable {
     int readInt();
     long readLong();
     double readDouble();
-    void readArray(byte[] arr);
+    byte[] readArray(int length);
     int readUnsignedVarint();
     ByteBuffer readByteBuffer(int length);
     int readVarint();
     long readVarlong();
+    int remaining();
 
     default String readString(int length) {
-        byte[] arr = new byte[length];
-        readArray(arr);
+        byte[] arr = readArray(length);
         return new String(arr, StandardCharsets.UTF_8);
     }
 
@@ -48,8 +48,7 @@ public interface Readable {
         if (unknowns == null) {
             unknowns = new ArrayList<>();
         }
-        byte[] data = new byte[size];
-        readArray(data);
+        byte[] data = readArray(size);
         unknowns.add(new RawTaggedField(tag, data));
         return unknowns;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index d85f1000bc..94896a7c75 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -355,6 +355,8 @@ public class DefaultRecord implements Record {
             int numHeaders = ByteUtils.readVarint(buffer);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
+            if (numHeaders > buffer.remaining())
+                throw new InvalidRecordException("Found invalid number of record headers. " + numHeaders + " is larger than the remaining size of the buffer");
 
             final Header[] headers;
             if (numHeaders == 0)
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
new file mode 100644
index 0000000000..1b78adbb96
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.message;
+
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SimpleArraysMessageTest {
+    @Test
+    public void testArrayBoundsChecking() {
+        // SimpleArraysMessageData takes 2 arrays
+        final ByteBuffer buf = ByteBuffer.wrap(new byte[] {
+            (byte) 0x7f, // Set size of first array to 126 which is larger than the size of this buffer
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00
+        });
+        final SimpleArraysMessageData out = new SimpleArraysMessageData();
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        assertEquals("Tried to allocate a collection of size 126, but there are only 7 bytes remaining.",
+                assertThrows(RuntimeException.class, () -> out.read(accessor, (short) 2)).getMessage());
+    }
+
+    @Test
+    public void testArrayBoundsCheckingOtherArray() {
+        // SimpleArraysMessageData takes 2 arrays
+        final ByteBuffer buf = ByteBuffer.wrap(new byte[] {
+            (byte) 0x01, // Set size of first array to 0
+            (byte) 0x7e, // Set size of second array to 125 which is larger than the size of this buffer
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00
+        });
+        final SimpleArraysMessageData out = new SimpleArraysMessageData();
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        assertEquals("Tried to allocate a collection of size 125, but there are only 6 bytes remaining.",
+                assertThrows(RuntimeException.class, () -> out.read(accessor, (short) 2)).getMessage());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
new file mode 100644
index 0000000000..6a0c6c2681
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ByteBufferAccessorTest {
+    @Test
+    public void testReadArray() {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        final byte[] testArray = new byte[] {0x4b, 0x61, 0x46};
+        accessor.writeByteArray(testArray);
+        accessor.writeInt(12345);
+        accessor.flip();
+        final byte[] testArray2 = accessor.readArray(3);
+        assertArrayEquals(testArray, testArray2);
+        assertEquals(12345, accessor.readInt());
+        assertEquals("Error reading byte array of 3 byte(s): only 0 byte(s) available",
+            assertThrows(RuntimeException.class,
+                () -> accessor.readArray(3)).getMessage());
+    }
+
+    @Test
+    public void testReadString() {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        String testString = "ABC";
+        final byte[] testArray = testString.getBytes(StandardCharsets.UTF_8);
+        accessor.writeByteArray(testArray);
+        accessor.flip();
+        assertEquals("ABC", accessor.readString(3));
+        assertEquals("Error reading byte array of 2 byte(s): only 0 byte(s) available",
+                assertThrows(RuntimeException.class,
+                        () -> accessor.readString(2)).getMessage());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index af154d321d..125c104c04 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -247,6 +247,20 @@ public class DefaultRecordTest {
         buf.flip();
         assertThrows(InvalidRecordException.class,
             () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
+
+        ByteBuffer buf2 = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf2);
+        buf2.put(attributes);
+        ByteUtils.writeVarlong(timestampDelta, buf2);
+        ByteUtils.writeVarint(offsetDelta, buf2);
+        ByteUtils.writeVarint(-1, buf2); // null key
+        ByteUtils.writeVarint(-1, buf2); // null value
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf2); // more headers than remaining buffer size, not allowed
+        buf2.position(buf2.limit());
+
+        buf2.flip();
+        assertThrows(InvalidRecordException.class,
+                () -> DefaultRecord.readFrom(buf2, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index 4415ff960a..254dea0430 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -16,22 +16,31 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RequestContextTest {
@@ -104,4 +113,78 @@ public class RequestContextTest {
         assertEquals(expectedResponse, parsedResponse.data());
     }
 
+    @Test
+    public void testInvalidRequestForImplicitHashCollection() throws UnknownHostException {
+        short version = (short) 5; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = produceRequest(version);
+        // corrupt the length of the topics array
+        corruptBuffer.putInt(8, (Integer.MAX_VALUE - 1) / 2);
+
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, version, "console-producer", 3);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals("Tried to allocate a collection of size 1073741823, but there are only 17 bytes remaining.", msg);
+    }
+
+    @Test
+    public void testInvalidRequestForArrayList() throws UnknownHostException {
+        short version = (short) 5; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = produceRequest(version);
+        // corrupt the length of the partitions array
+        corruptBuffer.putInt(17, Integer.MAX_VALUE);
+
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, version, "console-producer", 3);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals(
+                "Tried to allocate a collection of size 2147483647, but there are only 8 bytes remaining.", msg);
+    }
+
+    private ByteBuffer produceRequest(short version) {
+        ProduceRequestData data = new ProduceRequestData()
+                .setAcks((short) -1)
+                .setTimeoutMs(1);
+        data.topicData().add(
+                new ProduceRequestData.TopicProduceData()
+                        .setName("foo")
+                        .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
+                                .setIndex(42))));
+
+        return serialize(version, data);
+    }
+
+    private ByteBuffer serialize(short version, ApiMessage data) {
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        data.size(cache, version);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        data.write(new ByteBufferAccessor(buffer), cache, version);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Test
+    public void testInvalidRequestForByteArray() throws UnknownHostException {
+        short version = (short) 1; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = serialize(version, new SaslAuthenticateRequestData().setAuthBytes(new byte[0]));
+        // corrupt the length of the bytes array
+        corruptBuffer.putInt(0, Integer.MAX_VALUE);
+
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, version, "console-producer", 1);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals("Error reading byte array of 2147483647 byte(s): only 0 byte(s) available", msg);
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9346665058..9a4f2e020d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -177,6 +177,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.types.RawTaggedField;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.quota.ClientQuotaFilter;
@@ -197,10 +198,12 @@ import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.SecurityUtils;
 import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -221,6 +224,7 @@ import static org.apache.kafka.common.protocol.ApiKeys.FETCH;
 import static org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
+import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
 import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -2759,4 +2763,93 @@ public class RequestResponseTest {
         assertEquals(Integer.valueOf(1), createUpdateMetadataResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), createWriteTxnMarkersResponse().errorCounts().get(Errors.NONE));
     }
+
+    @Test
+    public void testInvalidSaslHandShakeRequest() {
+        AbstractRequest request = new SaslHandshakeRequest.Builder(
+                new SaslHandshakeRequestData().setMechanism("PLAIN")).build();
+        ByteBuffer serializedBytes = request.serialize();
+        // corrupt the length of the sasl mechanism string
+        serializedBytes.putShort(0, Short.MAX_VALUE);
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+            parseRequest(request.apiKey(), request.version(), serializedBytes)).getMessage();
+        assertEquals("Error reading byte array of 32767 byte(s): only 5 byte(s) available", msg);
+    }
+
+    @Test
+    public void testInvalidSaslAuthenticateRequest() {
+        short version = (short) 1; // choose a version with fixed length encoding, for simplicity
+        byte[] b = new byte[] {
+            0x11, 0x1f, 0x15, 0x2c,
+            0x5e, 0x2a, 0x20, 0x26,
+            0x6c, 0x39, 0x45, 0x1f,
+            0x25, 0x1c, 0x2d, 0x25,
+            0x43, 0x2a, 0x11, 0x76
+        };
+        SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(b);
+        AbstractRequest request = new SaslAuthenticateRequest(data, version);
+        ByteBuffer serializedBytes = request.serialize();
+
+        // corrupt the length of the bytes array
+        serializedBytes.putInt(0, Integer.MAX_VALUE);
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+                parseRequest(request.apiKey(), request.version(), serializedBytes)).getMessage();
+        assertEquals("Error reading byte array of 2147483647 byte(s): only 20 byte(s) available", msg);
+    }
+
+    @Test
+    public void testValidTaggedFieldsWithSaslAuthenticateRequest() {
+        byte[] byteArray = new byte[11];
+        ByteBufferAccessor accessor = new ByteBufferAccessor(ByteBuffer.wrap(byteArray));
+
+        //construct a SASL_AUTHENTICATE request
+        byte[] authBytes = "test".getBytes(StandardCharsets.UTF_8);
+        accessor.writeUnsignedVarint(authBytes.length + 1);
+        accessor.writeByteArray(authBytes);
+
+        //write total numbers of tags
+        accessor.writeUnsignedVarint(1);
+
+        //write first tag
+        RawTaggedField taggedField = new RawTaggedField(1, new byte[] {0x1, 0x2, 0x3});
+        accessor.writeUnsignedVarint(taggedField.tag());
+        accessor.writeUnsignedVarint(taggedField.size());
+        accessor.writeByteArray(taggedField.data());
+
+        accessor.flip();
+
+        SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest) AbstractRequest.
+                parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer()).request;
+        Assertions.assertArrayEquals(authBytes, saslAuthenticateRequest.data().authBytes());
+        assertEquals(1, saslAuthenticateRequest.data().unknownTaggedFields().size());
+        assertEquals(taggedField, saslAuthenticateRequest.data().unknownTaggedFields().get(0));
+    }
+
+    @Test
+    public void testInvalidTaggedFieldsWithSaslAuthenticateRequest() {
+        byte[] byteArray = new byte[13];
+        ByteBufferAccessor accessor = new ByteBufferAccessor(ByteBuffer.wrap(byteArray));
+
+        //construct a SASL_AUTHENTICATE request
+        byte[] authBytes = "test".getBytes(StandardCharsets.UTF_8);
+        accessor.writeUnsignedVarint(authBytes.length + 1);
+        accessor.writeByteArray(authBytes);
+
+        //write total numbers of tags
+        accessor.writeUnsignedVarint(1);
+
+        //write first tag
+        RawTaggedField taggedField = new RawTaggedField(1, new byte[] {0x1, 0x2, 0x3});
+        accessor.writeUnsignedVarint(taggedField.tag());
+        accessor.writeUnsignedVarint(Short.MAX_VALUE); // set wrong size for tagged field
+        accessor.writeByteArray(taggedField.data());
+
+        accessor.flip();
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+                parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage();
+        assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg);
+    }
 }
diff --git a/clients/src/test/resources/common/message/SimpleArraysMessage.json b/clients/src/test/resources/common/message/SimpleArraysMessage.json
new file mode 100644
index 0000000000..76dc283b6a
--- /dev/null
+++ b/clients/src/test/resources/common/message/SimpleArraysMessage.json
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+{
+  "name": "SimpleArraysMessage",
+  "type": "header",
+  "validVersions": "0-2",
+  "flexibleVersions": "1+",
+  "fields": [
+    { "name": "Goats", "type": "[]StructArray", "versions": "1+",
+      "fields": [
+        { "name": "Color", "type": "int8", "versions": "1+"},
+        { "name": "Name", "type": "string", "versions": "2+"}
+      ]
+    },
+    { "name": "Sheep", "type": "[]int32", "versions": "0+" }
+  ]
+}
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index c25e551ad9..175a7ad0f9 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -280,11 +280,7 @@ object TestRaftServer extends Logging {
       out.writeByteArray(data)
     }
 
-    override def read(input: protocol.Readable, size: Int): Array[Byte] = {
-      val data = new Array[Byte](size)
-      input.readArray(data)
-      data
-    }
+    override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
   }
 
   private class LatencyHistogram(
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 08e7ea481e..6777d9469b 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -350,11 +350,7 @@ object KafkaMetadataLogTest {
     override def write(data: Array[Byte], serializationCache: ObjectSerializationCache, out: Writable): Unit = {
       out.writeByteArray(data)
     }
-    override def read(input: protocol.Readable, size: Int): Array[Byte] = {
-      val array = new Array[Byte](size)
-      input.readArray(array)
-      array
-    }
+    override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
   }
 
   def buildMetadataLogAndDir(
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 5e591c5214..838c79fb07 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -610,8 +610,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
                 buffer.printf("%s_readable.readByteBuffer(%s)%s",
                     assignmentPrefix, lengthVar, assignmentSuffix);
             } else {
-                buffer.printf("byte[] newBytes = new byte[%s];%n", lengthVar);
-                buffer.printf("_readable.readArray(newBytes);%n");
+                buffer.printf("byte[] newBytes = _readable.readArray(%s);%n", lengthVar);
                 buffer.printf("%snewBytes%s", assignmentPrefix, assignmentSuffix);
             }
         } else if (type.isRecords()) {
@@ -619,6 +618,12 @@ public final class MessageDataGenerator implements MessageClassGenerator {
                 assignmentPrefix, lengthVar, assignmentSuffix);
         } else if (type.isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
+            buffer.printf("if (%s > _readable.remaining()) {%n", lengthVar);
+            buffer.incrementIndent();
+            buffer.printf("throw new RuntimeException(\"Tried to allocate a collection of size \" + %s + \", but " +
+                    "there are only \" + _readable.remaining() + \" bytes remaining.\");%n", lengthVar);
+            buffer.decrementIndent();
+            buffer.printf("}%n");
             if (isStructArrayWithKeys) {
                 headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
                 buffer.printf("%s newCollection = new %s(%s);%n",
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
index cf096dfe69..14f5bd63fd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
@@ -40,8 +40,7 @@ public class StringSerde implements RecordSerde<String> {
 
     @Override
     public String read(Readable input, int size) {
-        byte[] data = new byte[size];
-        input.readArray(data);
+        byte[] data = input.readArray(size);
         return Utils.utf8(data);
     }