You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/30 18:07:48 UTC
kafka git commit: KAFKA-2903;
FileRecords.read doesn't handle size > sizeInBytes when start is not
zero
Repository: kafka
Updated Branches:
refs/heads/trunk 6118ecb59 -> 8e4b3dca7
KAFKA-2903; FileRecords.read doesn't handle size > sizeInBytes when start is not zero
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #4158 from ijuma/kafka-2903-file-records-read-slice-size-greater
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e4b3dca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e4b3dca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e4b3dca
Branch: refs/heads/trunk
Commit: 8e4b3dca7b1bcef0e4875fea4fa79377998616c3
Parents: 6118ecb
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Oct 30 11:06:11 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Oct 30 11:06:11 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/common/record/FileRecords.java | 10 ++---
.../kafka/common/record/FileRecordsTest.java | 41 ++++++++++++++++----
2 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8e4b3dca/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index ae99db3..e907abc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -136,12 +136,10 @@ public class FileRecords extends AbstractRecords implements Closeable {
if (size < 0)
throw new IllegalArgumentException("Invalid size: " + size);
- final int end;
- // handle integer overflow
- if (this.start + position + size < 0)
- end = sizeInBytes();
- else
- end = Math.min(this.start + position + size, sizeInBytes());
+ int end = this.start + position + size;
+ // handle integer overflow or if end is beyond the end of the file
+ if (end < 0 || end >= start + sizeInBytes())
+ end = start + sizeInBytes();
return new FileRecords(file, channel, this.start + position, end, true);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8e4b3dca/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 6df7c2d..53ac200 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -118,17 +118,44 @@ public class FileRecordsTest {
@Test
public void testRead() throws IOException {
FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
+ assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes());
TestUtils.checkEquals(fileRecords.batches(), read.batches());
List<RecordBatch> items = batches(read);
+ RecordBatch first = items.get(0);
+
+ // read from second message until the end
+ read = fileRecords.read(first.sizeInBytes(), fileRecords.sizeInBytes() - first.sizeInBytes());
+ assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+ assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+ // read from second message and size is past the end of the file
+ read = fileRecords.read(first.sizeInBytes(), fileRecords.sizeInBytes());
+ assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+ assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+ // read from second message and position + size overflows
+ read = fileRecords.read(first.sizeInBytes(), Integer.MAX_VALUE);
+ assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+ assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+ // read from second message and size is past the end of the file on a view/slice
+ read = fileRecords.read(1, fileRecords.sizeInBytes() - 1)
+ .read(first.sizeInBytes() - 1, fileRecords.sizeInBytes());
+ assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+ assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+ // read from second message and position + size overflows on a view/slice
+ read = fileRecords.read(1, fileRecords.sizeInBytes() - 1)
+ .read(first.sizeInBytes() - 1, Integer.MAX_VALUE);
+ assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes());
+ assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read));
+
+ // read a single message starting from second message
RecordBatch second = items.get(1);
-
- read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
- assertEquals("Try a read starting from the second message",
- items.subList(1, 3), batches(read));
-
- read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
- assertEquals("Try a read of a single message starting from the second message",
+ read = fileRecords.read(first.sizeInBytes(), second.sizeInBytes());
+ assertEquals(second.sizeInBytes(), read.sizeInBytes());
+ assertEquals("Read a single message starting from the second message",
Collections.singletonList(second), batches(read));
}