You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/08 18:32:00 UTC

[kafka] branch trunk updated: KAFKA-9835; Protect `FileRecords.slice` from concurrent write (#8451)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 778b1e3  KAFKA-9835; Protect `FileRecords.slice` from concurrent write (#8451)
778b1e3 is described below

commit 778b1e3f5435c5b98fb52a4e15fc32d5cfa2c92c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Apr 8 11:31:27 2020 -0700

    KAFKA-9835; Protect `FileRecords.slice` from concurrent write (#8451)
    
    A read from the end of the log interleaved with a concurrent write can result in reading data above the expected read limit. In particular, this would allow a read above the high watermark. The root of the problem is consecutive calls to `sizeInBytes` in `FileRecords.slice` which do not account for an increase in size due to a concurrent write. This patch fixes the problem by using a single call to `sizeInBytes` and caching the result.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../apache/kafka/common/record/FileRecords.java    |  9 ++++--
 .../kafka/common/record/FileRecordsTest.java       | 33 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index c097f55..46ac481 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -135,17 +135,20 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @return A sliced wrapper on this message set limited based on the given position and size
      */
     public FileRecords slice(int position, int size) throws IOException {
+        // Cache current size in case concurrent write changes it
+        int currentSizeInBytes = sizeInBytes();
+
         if (position < 0)
             throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this);
-        if (position > sizeInBytes() - start)
+        if (position > currentSizeInBytes - start)
             throw new IllegalArgumentException("Slice from position " + position + " exceeds end position of " + this);
         if (size < 0)
             throw new IllegalArgumentException("Invalid size: " + size + " in read from " + this);
 
         int end = this.start + position + size;
         // handle integer overflow or if end is beyond the end of the file
-        if (end < 0 || end >= start + sizeInBytes())
-            end = start + sizeInBytes();
+        if (end < 0 || end > start + currentSizeInBytes)
+            end = start + currentSizeInBytes;
         return new FileRecords(file, channel, this.start + position, end, true);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index bf79987..08dbc57 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -36,6 +36,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.utf8;
@@ -118,6 +121,36 @@ public class FileRecordsTest {
         testPartialWrite(6, fileRecords);
     }
 
+    @Test
+    public void testSliceSizeLimitWithConcurrentWrite() throws Exception {
+        FileRecords log = FileRecords.open(tempFile());
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        int maxSizeInBytes = 16384;
+
+        try {
+            Future<Object> readerCompletion = executor.submit(() -> {
+                while (log.sizeInBytes() < maxSizeInBytes) {
+                    int currentSize = log.sizeInBytes();
+                    FileRecords slice = log.slice(0, currentSize);
+                    assertEquals(currentSize, slice.sizeInBytes());
+                }
+                return null;
+            });
+
+            Future<Object> writerCompletion = executor.submit(() -> {
+                while (log.sizeInBytes() < maxSizeInBytes) {
+                    append(log, values);
+                }
+                return null;
+            });
+
+            writerCompletion.get();
+            readerCompletion.get();
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
     private void testPartialWrite(int size, FileRecords fileRecords) throws IOException {
         ByteBuffer buffer = ByteBuffer.allocate(size);
         for (int i = 0; i < size; i++)