You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/02/22 03:52:47 UTC

[kafka] branch trunk updated: MINOR: add size check for tagged fields (#13100)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 30d7d3b5ce4 MINOR: add size check for tagged fields (#13100)
30d7d3b5ce4 is described below

commit 30d7d3b5ce42ed7663a90193be47e30aab75537f
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Wed Feb 22 11:52:21 2023 +0800

    MINOR: add size check for tagged fields (#13100)
    
    Add size check for taggedFields of a tag, and add tests.
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Divij Vaidya <di...@amazon.com>
---
 .../kafka/common/protocol/types/TaggedFields.java  |  5 +++++
 .../apache/kafka/common/protocol/types/Type.java   |  1 +
 .../protocol/types/ProtocolSerializationTest.java  | 22 ++++++++++++++++++++++
 3 files changed, 28 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java
index 26fb65830e6..901bf3f85aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java
@@ -100,6 +100,11 @@ public class TaggedFields extends DocumentedType {
             }
             prevTag = tag;
             int size = ByteUtils.readUnsignedVarint(buffer);
+            if (size < 0)
+                throw new SchemaException("field size " + size + " cannot be negative");
+            if (size > buffer.remaining())
+                throw new SchemaException("Error reading field of size " + size + ", only " + buffer.remaining() + " bytes available");
+
             Field field = fields.get(tag);
             if (field == null) {
                 byte[] bytes = new byte[size];
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 4af74dbf4cc..bd4cb41c7e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -39,6 +39,7 @@ public abstract class Type {
 
     /**
      * Read the typed object from the buffer
+     * Please remember to do size validation before creating the container (ex: array) for the following data
      *
      * @throws SchemaException If the object is not valid for its type
      */
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 811bcf4f88a..f96112dd806 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -220,6 +220,28 @@ public class ProtocolSerializationTest {
         }
     }
 
+    @Test
+    public void testReadTaggedFieldsSizeTooLarge() {
+        int tag = 1;
+        Type type = TaggedFields.of(tag, new Field("field", Type.NULLABLE_STRING));
+        int size = 10;
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        int numTaggedFields = 1;
+        // write the number of tagged fields
+        ByteUtils.writeUnsignedVarint(numTaggedFields, buffer);
+        // write the tag of the first tagged fields
+        ByteUtils.writeUnsignedVarint(tag, buffer);
+        // write the size of tagged fields for this tag, using a large number for testing
+        ByteUtils.writeUnsignedVarint(Integer.MAX_VALUE, buffer);
+        int expectedRemaining = buffer.remaining();
+        buffer.rewind();
+
+        // should throw SchemaException while reading the buffer, instead of OOM
+        Throwable e = assertThrows(SchemaException.class, () -> type.read(buffer));
+        assertEquals("Error reading field of size " + Integer.MAX_VALUE + ", only " + expectedRemaining + " bytes available",
+            e.getMessage());
+    }
+
     @Test
     public void testReadNegativeArraySize() {
         Type type = new ArrayOf(Type.INT8);