You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/19 17:23:51 UTC

[GitHub] [kafka] ccding opened a new pull request #11080: [WIP] fix NPE when record==null in append

ccding opened a new pull request #11080:
URL: https://github.com/apache/kafka/pull/11080


   This code
   https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L294-L296
   returns record=null, and can subsequently cause a null pointer exception in
   https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   This PR lets the broker throw an invalid record exception and notify clients. The fix is similar to
   https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L340-L358
   where we throw an invalid record exception when the record's integrity is broken.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r673207693



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Yes, I understand you're talking about the producer case. I am talking about the fetch case. As I said, I think we may not need that special logic anymore, but @hachikuji would know for sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-882800476


   This PR is ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r673196785



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       > the case where an incomplete record is returned by the broker
   
   I am referring to the produce API for the null pointer exception. The record is from a producer. The `InvalidRecordException` will trigger a response to the producer.
   
   If the fetch path requires a different return value, I guess the problem becomes more complicated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672758944



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Are you saying the case that we are yet to complete reading the request? I didn't see a retry path, but it will cause a null point exception at https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest me do here?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Are you saying the case that we are yet to complete reading the request? I didn't see a retry path, but it will cause a null point exception at https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest I do here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Apologies for the delay here. I don't see a problem with the change. I believe that @ijuma is right that the fetch response may still return incomplete data, but I think this is handled in `ByteBufferLogInputStream`. We stop batch iteration early if there is incomplete data, so we would never reach the `readFrom` here which is called for each record in the batch. It's worth noting also that the only caller of this method (in `DefaultRecordBatch.uncompressedIterator`) has the following logic:
   
   ```java
   try {
     return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
   } catch (BufferUnderflowException e) {
     throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
   }
   ```
   
   So it is already handle underflows in a similar way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r673196785



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       > the case where an incomplete record is returned by the broker
   
   I am referring to the produce API for the null pointer exception. The record is from producers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r673196785



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       > the case where an incomplete record is returned by the broker
   
   I am referring to the produce API in this PR. The record is from producers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r676954916



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       @hachikuji do you have time to have a look at this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672758944



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Are you saying the case that we are yet to complete reading the request? I didn't see a retry path, but it will cause a null point exception at https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest me do here?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Are you saying the case that we are yet to complete reading the request? I didn't see a retry path, but it will cause a null point exception at https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest I do here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-918658119


   @ccding I kicked off a new build since it has been a while since the PR was submitted. Assuming tests are ok, I will merge shortly. Thanks for your patience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Apologies for the delay here. I don't see a problem with the change. I believe that @ijuma is right that the fetch response may still return incomplete data, but I believe this is handled in `ByteBufferLogInputStream`. We stop batch iteration early if there is incomplete data, so we would never reach the `readFrom` here which is called for each record in the batch. It's worth noting also that the only caller of this method (in `DefaultRecordBatch.uncompressedIterator`) has the following logic:
   
   ```java
                   try {
                       return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
                   } catch (BufferUnderflowException e) {
                       throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
                   }
   ```
   
   So it is already handle underflows in a similar way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672758944



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Are you saying the case that we are yet to complete reading the request? I didn't see a retry path, but it will cause a null point exception at https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest me do here?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Are you saying the case that we are yet to complete reading the request? I didn't see a retry path, but it will cause a null point exception at https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest I do here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672610426



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Is this really an exceptional case? Don't we do reads where we don't know exactly where the read ends and hence will trigger this path?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       I think the intent here was to cover the case where an incomplete record is returned by the broker. However, we have broker logic to try and avoid this case since KIP-74:
   
   ```java
   } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
               // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
               // progress in such cases and don't need to report a `RecordTooLargeException`
               FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
   ```
   
   @hachikuji Do you remember if there is still a reason to return `null` here instead of the exception @ccding is proposing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672610426



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Is this really an exceptional case? Don't we do reads where we don't know exactly where the read ends and hence will trigger this path?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       I think the intent here was to cover the case where an incomplete record is returned by the broker. However, we have broker logic to try and avoid this case since KIP-74:
   
   ```java
   } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
               // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
               // progress in such cases and don't need to report a `RecordTooLargeException`
               FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
   ```
   
   @hachikuji Do you remember if there is still a reason to return `null` here instead of the exception @ccding is proposing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11080:
URL: https://github.com/apache/kafka/pull/11080


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707781491



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Thanks for checking @hachikuji.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-882800476


   This PR is ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672610426



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Is this really an exceptional case? Don't we do reads where we don't know exactly where the read ends and hence will trigger this path?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Apologies for the delay here. I don't see a problem with the change. I believe that @ijuma is right that the fetch response may still return incomplete data, but I think this is handled in `ByteBufferLogInputStream`. We stop batch iteration early if there is incomplete data, so we would never reach the `readFrom` here which is called for each record in the batch. It's worth noting also that the only caller of this method (in `DefaultRecordBatch.uncompressedIterator`) has the following logic:
   
   ```java
                   try {
                       return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
                   } catch (BufferUnderflowException e) {
                       throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
                   }
   ```
   
   So it is already handle underflows in a similar way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-919272706


   The tests Jason kicked in failed two tests:
   ```
   Build / JDK 8 and Scala 2.12 / testDescribeTopicsWithIds() – kafka.api.PlaintextAdminIntegrationTest
   Build / JDK 11 and Scala 2.13 / shouldQueryStoresAfterAddingAndRemovingStreamThread – org.apache.kafka.streams.integration.StoreQueryIntegrationTest
   ```
   both worked on my local run with merging trunk to this branch.
   
   Pushing the trunk merge to this branch and let Jenkins to run it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672784612



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + buffer.remaining() +
+                " remaining bytes.");

Review comment:
       I think the intent here was to cover the case where an incomplete record is returned by the broker. However, we have broker logic to try and avoid this case since KIP-74:
   
   ```java
   } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
               // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
               // progress in such cases and don't need to report a `RecordTooLargeException`
               FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
   ```
   
   @hachikuji Do you remember if there is still a reason to return `null` here instead of the exception @ccding is proposing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11080: fix NPE when record==null in append

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-882800476


   This PR is ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org