You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/09/14 17:08:08 UTC
[kafka] branch 3.0 updated: KAFKA-13149;
Fix NPE when handling malformed record data in produce requests
(#11080)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new e89fa449 KAFKA-13149; Fix NPE when handling malformed record data in produce requests (#11080)
e89fa449 is described below
commit e89fa449dbbba80bb9e3adf121439361b137ae8f
Author: Cong Ding <co...@ccding.com>
AuthorDate: Tue Sep 14 11:47:54 2021 -0500
KAFKA-13149; Fix NPE when handling malformed record data in produce requests (#11080)
Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
.../java/org/apache/kafka/common/record/DefaultRecord.java | 4 +++-
.../java/org/apache/kafka/common/record/DefaultRecordTest.java | 10 ++++++++++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index b63773b..8772556 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -293,7 +293,9 @@ public class DefaultRecord implements Record {
Long logAppendTime) {
int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
if (buffer.remaining() < sizeOfBodyInBytes)
- return null;
+ throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+ " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+ " remaining bytes.");
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index af154d3..49743d2 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -481,4 +481,14 @@ public class DefaultRecordTest {
assertEquals(RecordBatch.NO_SEQUENCE, record.sequence());
}
+ @Test
+ public void testInvalidSizeOfBodyInBytes() {
+ int sizeOfBodyInBytes = 10;
+ ByteBuffer buf = ByteBuffer.allocate(5);
+ ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+
+ buf.flip();
+ assertThrows(InvalidRecordException.class,
+ () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
+ }
}