You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/09/22 01:23:07 UTC

[kafka] branch trunk updated: MINOR: use addExact to avoid overflow and some cleanup (#12660)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf7ddf73af1 MINOR: use addExact to avoid overflow and some cleanup (#12660)
bf7ddf73af1 is described below

commit bf7ddf73af1ee416388c09a8ad42bfa6cf295641
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Sep 22 09:22:58 2022 +0800

    MINOR: use addExact to avoid overflow and some cleanup (#12660)
    
    What changes in this PR:
    1. Use addExact to avoid overflow in BatchAccumulator#bytesNeeded. We did use addExact in bytesNeededForRecords method, but forgot that when returning the result.
    2. javadoc improvement
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/raft/internals/BatchAccumulator.java    | 4 +++-
 .../main/java/org/apache/kafka/raft/internals/BatchBuilder.java   | 4 ++--
 .../java/org/apache/kafka/raft/internals/BatchMemoryPool.java     | 8 ++++----
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 69732cd670b..b84a7d57b8a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -107,6 +107,7 @@ public class BatchAccumulator<T> implements Closeable {
      * @throws NotLeaderException if the epoch is less than the leader epoch
      * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
      * @throws BufferAllocationException if we failed to allocate memory for the records
+     * @throws IllegalStateException if we tried to append new records after the batch has been built
      */
     public long append(int epoch, List<T> records) {
         return append(epoch, records, false);
@@ -127,6 +128,7 @@ public class BatchAccumulator<T> implements Closeable {
      * @throws NotLeaderException if the epoch is less than the leader epoch
      * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
      * @throws BufferAllocationException if we failed to allocate memory for the records
+     * @throws IllegalStateException if we tried to append new records after the batch has been built
      */
     public long appendAtomic(int epoch, List<T> records) {
         return append(epoch, records, true);
@@ -260,7 +262,7 @@ public class BatchAccumulator<T> implements Closeable {
     /**
      * Append a {@link LeaderChangeMessage} record to the batch
      *
-     * @param LeaderChangeMessage The message to append
+     * @param leaderChangeMessage The message to append
      * @param currentTimestamp The current time in milliseconds
      * @throws IllegalStateException on failure to allocate a buffer for the record
      */
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
index 982040b84ee..92b63ec5526 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
@@ -142,7 +142,7 @@ public class BatchBuilder<T> {
         );
 
         if (!isOpenForAppends) {
-            return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
+            return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(), bytesNeeded));
         }
 
         int approxUnusedSizeInBytes = maxBytes - approximateSizeInBytes();
@@ -157,7 +157,7 @@ public class BatchBuilder<T> {
             }
         }
 
-        return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
+        return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(), bytesNeeded));
     }
 
     private int flushedSizeInBytes() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
index ae6cba81de6..5120d6928d3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
@@ -54,12 +54,12 @@ public class BatchMemoryPool implements MemoryPool {
     }
 
     /**
-     * Allocate a byte buffer in this pool.
+     * Allocate a byte buffer with {@code batchSize} in this pool.
      *
      * This method should always succeed and never return null. The sizeBytes parameter must be less than
      * the batchSize used in the constructor.
      *
-     * @param sizeBytes is not used to determine the size of the byte buffer
+     * @param sizeBytes is used to determine if the requested size is exceeding the batchSize
      * @throws IllegalArgumentException if sizeBytes is greater than batchSize
      */
     @Override
@@ -96,9 +96,9 @@ public class BatchMemoryPool implements MemoryPool {
         try {
             previouslyAllocated.clear();
 
-            if (previouslyAllocated.limit() != batchSize) {
+            if (previouslyAllocated.capacity() != batchSize) {
                 throw new IllegalArgumentException("Released buffer with unexpected size "
-                    + previouslyAllocated.limit());
+                    + previouslyAllocated.capacity());
             }
 
             // Free the buffer if the number of pooled buffers is already the maximum number of batches.