You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/08 18:32:00 UTC
[kafka] branch trunk updated: KAFKA-9835;
Protect `FileRecords.slice` from concurrent write (#8451)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 778b1e3 KAFKA-9835; Protect `FileRecords.slice` from concurrent write (#8451)
778b1e3 is described below
commit 778b1e3f5435c5b98fb52a4e15fc32d5cfa2c92c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Apr 8 11:31:27 2020 -0700
KAFKA-9835; Protect `FileRecords.slice` from concurrent write (#8451)
A read from the end of the log interleaved with a concurrent write can result in reading data above the expected read limit. In particular, this would allow a read above the high watermark. The root of the problem is consecutive calls to `sizeInBytes` in `FileRecords.slice` which do not account for an increase in size due to a concurrent write. This patch fixes the problem by using a single call to `sizeInBytes` and caching the result.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../apache/kafka/common/record/FileRecords.java | 9 ++++--
.../kafka/common/record/FileRecordsTest.java | 33 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 3 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index c097f55..46ac481 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -135,17 +135,20 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @return A sliced wrapper on this message set limited based on the given position and size
*/
public FileRecords slice(int position, int size) throws IOException {
+ // Cache current size in case concurrent write changes it
+ int currentSizeInBytes = sizeInBytes();
+
if (position < 0)
throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this);
- if (position > sizeInBytes() - start)
+ if (position > currentSizeInBytes - start)
throw new IllegalArgumentException("Slice from position " + position + " exceeds end position of " + this);
if (size < 0)
throw new IllegalArgumentException("Invalid size: " + size + " in read from " + this);
int end = this.start + position + size;
// handle integer overflow or if end is beyond the end of the file
- if (end < 0 || end >= start + sizeInBytes())
- end = start + sizeInBytes();
+ if (end < 0 || end > start + currentSizeInBytes)
+ end = start + currentSizeInBytes;
return new FileRecords(file, channel, this.start + position, end, true);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index bf79987..08dbc57 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -36,6 +36,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
@@ -118,6 +121,36 @@ public class FileRecordsTest {
testPartialWrite(6, fileRecords);
}
+ @Test
+ public void testSliceSizeLimitWithConcurrentWrite() throws Exception {
+ FileRecords log = FileRecords.open(tempFile());
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ int maxSizeInBytes = 16384;
+
+ try {
+ Future<Object> readerCompletion = executor.submit(() -> {
+ while (log.sizeInBytes() < maxSizeInBytes) {
+ int currentSize = log.sizeInBytes();
+ FileRecords slice = log.slice(0, currentSize);
+ assertEquals(currentSize, slice.sizeInBytes());
+ }
+ return null;
+ });
+
+ Future<Object> writerCompletion = executor.submit(() -> {
+ while (log.sizeInBytes() < maxSizeInBytes) {
+ append(log, values);
+ }
+ return null;
+ });
+
+ writerCompletion.get();
+ readerCompletion.get();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
private void testPartialWrite(int size, FileRecords fileRecords) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(size);
for (int i = 0; i < size; i++)