You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/30 18:07:48 UTC

kafka git commit: KAFKA-2903; FileRecords.read doesn't handle size > sizeInBytes when start is not zero

Repository: kafka
Updated Branches:
  refs/heads/trunk 6118ecb59 -> 8e4b3dca7


KAFKA-2903; FileRecords.read doesn't handle size > sizeInBytes when start is not zero

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #4158 from ijuma/kafka-2903-file-records-read-slice-size-greater


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e4b3dca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e4b3dca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e4b3dca

Branch: refs/heads/trunk
Commit: 8e4b3dca7b1bcef0e4875fea4fa79377998616c3
Parents: 6118ecb
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Oct 30 11:06:11 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Oct 30 11:06:11 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/record/FileRecords.java | 10 ++---
 .../kafka/common/record/FileRecordsTest.java    | 41 ++++++++++++++++----
 2 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e4b3dca/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index ae99db3..e907abc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -136,12 +136,10 @@ public class FileRecords extends AbstractRecords implements Closeable {
         if (size < 0)
             throw new IllegalArgumentException("Invalid size: " + size);
 
-        final int end;
-        // handle integer overflow
-        if (this.start + position + size < 0)
-            end = sizeInBytes();
-        else
-            end = Math.min(this.start + position + size, sizeInBytes());
+        int end = this.start + position + size;
+        // handle integer overflow or if end is beyond the end of the file
+        if (end < 0 || end >= start + sizeInBytes())
+            end = start + sizeInBytes();
         return new FileRecords(file, channel, this.start + position, end, true);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e4b3dca/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 6df7c2d..53ac200 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -118,17 +118,44 @@ public class FileRecordsTest {
     @Test
     public void testRead() throws IOException {
         FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
+        assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes());
         TestUtils.checkEquals(fileRecords.batches(), read.batches());
 
         List<RecordBatch> items = batches(read);
+        RecordBatch first = items.get(0);
+
+        // read from second message until the end
+        read = fileRecords.read(first.sizeInBytes(), fileRecords.sizeInBytes() - first.sizeInBytes());
+        assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+        assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+        // read from second message and size is past the end of the file
+        read = fileRecords.read(first.sizeInBytes(), fileRecords.sizeInBytes());
+        assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+        assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+        // read from second message and position + size overflows
+        read = fileRecords.read(first.sizeInBytes(), Integer.MAX_VALUE);
+        assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+        assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+        // read from second message and size is past the end of the file on a view/slice
+        read = fileRecords.read(1, fileRecords.sizeInBytes() - 1)
+                .read(first.sizeInBytes() - 1, fileRecords.sizeInBytes());
+        assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+        assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+        // read from second message and position + size overflows on a view/slice
+        read = fileRecords.read(1, fileRecords.sizeInBytes() - 1)
+                .read(first.sizeInBytes() - 1, Integer.MAX_VALUE);
+        assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+        assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+        // read a single message starting from second message
         RecordBatch second = items.get(1);
-
-        read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
-        assertEquals("Try a read starting from the second message",
-                items.subList(1, 3), batches(read));
-
-        read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
-        assertEquals("Try a read of a single message starting from the second message",
+        read = fileRecords.read(first.sizeInBytes(), second.sizeInBytes());
+        assertEquals(second.sizeInBytes(), read.sizeInBytes());
+        assertEquals("Read a single message starting from the second message",
                 Collections.singletonList(second), batches(read));
     }