You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/09/22 01:23:07 UTC
[kafka] branch trunk updated: MINOR: use addExact to avoid overflow and some cleanup (#12660)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bf7ddf73af1 MINOR: use addExact to avoid overflow and some cleanup (#12660)
bf7ddf73af1 is described below
commit bf7ddf73af1ee416388c09a8ad42bfa6cf295641
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Sep 22 09:22:58 2022 +0800
MINOR: use addExact to avoid overflow and some cleanup (#12660)
What changes in this PR:
1. Use addExact to avoid overflow in BatchAccumulator#bytesNeeded. We did use addExact in bytesNeededForRecords method, but forgot that when returning the result.
2. javadoc improvement
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../java/org/apache/kafka/raft/internals/BatchAccumulator.java | 4 +++-
.../main/java/org/apache/kafka/raft/internals/BatchBuilder.java | 4 ++--
.../java/org/apache/kafka/raft/internals/BatchMemoryPool.java | 8 ++++----
3 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 69732cd670b..b84a7d57b8a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -107,6 +107,7 @@ public class BatchAccumulator<T> implements Closeable {
* @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for the records
+ * @throws IllegalStateException if we tried to append new records after the batch has been built
*/
public long append(int epoch, List<T> records) {
return append(epoch, records, false);
@@ -127,6 +128,7 @@ public class BatchAccumulator<T> implements Closeable {
* @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for the records
+ * @throws IllegalStateException if we tried to append new records after the batch has been built
*/
public long appendAtomic(int epoch, List<T> records) {
return append(epoch, records, true);
@@ -260,7 +262,7 @@ public class BatchAccumulator<T> implements Closeable {
/**
* Append a {@link LeaderChangeMessage} record to the batch
*
- * @param LeaderChangeMessage The message to append
+ * @param leaderChangeMessage The message to append
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
index 982040b84ee..92b63ec5526 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
@@ -142,7 +142,7 @@ public class BatchBuilder<T> {
);
if (!isOpenForAppends) {
- return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
+ return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(), bytesNeeded));
}
int approxUnusedSizeInBytes = maxBytes - approximateSizeInBytes();
@@ -157,7 +157,7 @@ public class BatchBuilder<T> {
}
}
- return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
+ return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(), bytesNeeded));
}
private int flushedSizeInBytes() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
index ae6cba81de6..5120d6928d3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
@@ -54,12 +54,12 @@ public class BatchMemoryPool implements MemoryPool {
}
/**
- * Allocate a byte buffer in this pool.
+ * Allocate a byte buffer with {@code batchSize} in this pool.
*
* This method should always succeed and never return null. The sizeBytes parameter must be less than
* the batchSize used in the constructor.
*
- * @param sizeBytes is not used to determine the size of the byte buffer
+ * @param sizeBytes is used to determine if the requested size is exceeding the batchSize
* @throws IllegalArgumentException if sizeBytes is greater than batchSize
*/
@Override
@@ -96,9 +96,9 @@ public class BatchMemoryPool implements MemoryPool {
try {
previouslyAllocated.clear();
- if (previouslyAllocated.limit() != batchSize) {
+ if (previouslyAllocated.capacity() != batchSize) {
throw new IllegalArgumentException("Released buffer with unexpected size "
- + previouslyAllocated.limit());
+ + previouslyAllocated.capacity());
}
// Free the buffer if the number of pooled buffers is already the maximum number of batches.