You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/09/14 17:08:08 UTC

[kafka] branch 3.0 updated: KAFKA-13149; Fix NPE when handling malformed record data in produce requests (#11080)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e89fa449 KAFKA-13149; Fix NPE when handling malformed record data in produce requests (#11080)
e89fa449 is described below

commit e89fa449dbbba80bb9e3adf121439361b137ae8f
Author: Cong Ding <co...@ccding.com>
AuthorDate: Tue Sep 14 11:47:54 2021 -0500

    KAFKA-13149; Fix NPE when handling malformed record data in produce requests (#11080)
    
    Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/common/record/DefaultRecord.java     |  4 +++-
 .../java/org/apache/kafka/common/record/DefaultRecordTest.java | 10 ++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index b63773b..8772556 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -293,7 +293,9 @@ public class DefaultRecord implements Record {
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");
 
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
         return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index af154d3..49743d2 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -481,4 +481,14 @@ public class DefaultRecordTest {
         assertEquals(RecordBatch.NO_SEQUENCE, record.sequence());
     }
 
+    @Test
+    public void testInvalidSizeOfBodyInBytes() {
+        int sizeOfBodyInBytes = 10;
+        ByteBuffer buf = ByteBuffer.allocate(5);
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+
+        buf.flip();
+        assertThrows(InvalidRecordException.class,
+            () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
+    }
 }