You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/17 11:49:04 UTC
kafka git commit: KAFKA-5031;
Follow-up with small cleanups/improvements
Repository: kafka
Updated Branches:
refs/heads/trunk 3fdbba1c7 -> 51fc50ed0
KAFKA-5031; Follow-up with small cleanups/improvements
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3363 from hachikuji/KAFKA-5031-FOLLOWUP
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51fc50ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51fc50ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51fc50ed
Branch: refs/heads/trunk
Commit: 51fc50ed0ba7aa4d0e618f9103a1531680489139
Parents: 3fdbba1
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Jun 17 12:48:34 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Jun 17 12:48:38 2017 +0100
----------------------------------------------------------------------
.../kafka/common/record/DefaultRecord.java | 25 +++---
.../kafka/common/record/DefaultRecordBatch.java | 2 +-
.../common/record/InvalidRecordException.java | 4 +
.../kafka/common/record/DefaultRecordTest.java | 84 ++++++++++++++++++++
4 files changed, 104 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 9b7f327..109528f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -293,7 +293,8 @@ public class DefaultRecord implements Record {
ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
- return readFrom(recordBuffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+ baseSequence, logAppendTime);
}
public static DefaultRecord readFrom(ByteBuffer buffer,
@@ -306,11 +307,13 @@ public class DefaultRecord implements Record {
return null;
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
- return readFrom(buffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+ baseSequence, logAppendTime);
}
private static DefaultRecord readFrom(ByteBuffer buffer,
int sizeInBytes,
+ int sizeOfBodyInBytes,
long baseOffset,
long baseTimestamp,
int baseSequence,
@@ -349,19 +352,24 @@ public class DefaultRecord implements Record {
if (numHeaders < 0)
throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
+ final Header[] headers;
if (numHeaders == 0)
- return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, Record.EMPTY_HEADERS);
+ headers = Record.EMPTY_HEADERS;
+ else
+ headers = readHeaders(buffer, numHeaders);
- Header[] headers = readHeaders(buffer, numHeaders, recordStart, sizeInBytes);
+ // validate whether we have read all header bytes in the current record
+ if (buffer.position() - recordStart != sizeOfBodyInBytes)
+ throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
+ " bytes in record payload, but instead read " + (buffer.position() - recordStart));
return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
} catch (BufferUnderflowException | IllegalArgumentException e) {
- throw new InvalidRecordException("Invalid header data or number of headers declared for the record, reason for failure was "
- + e.getMessage());
+ throw new InvalidRecordException("Found invalid record structure", e);
}
}
- private static Header[] readHeaders(ByteBuffer buffer, int numHeaders, int recordStart, int sizeInBytes) {
+ private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) {
Header[] headers = new Header[numHeaders];
for (int i = 0; i < numHeaders; i++) {
int headerKeySize = ByteUtils.readVarint(buffer);
@@ -382,9 +390,6 @@ public class DefaultRecord implements Record {
headers[i] = new RecordHeader(headerKey, headerValue);
}
- // validate whether we have read all header bytes in the current record
- if (buffer.position() - recordStart != sizeInBytes - ByteUtils.sizeOfVarint(sizeInBytes))
- throw new InvalidRecordException("Invalid header data or number of headers declared for the record");
return headers;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 5a7e27a..f933f41 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -259,7 +259,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
try {
return inputStream.read() == -1;
} catch (IOException e) {
- return false;
+ throw new KafkaException("Error checking for remaining bytes after reading batch", e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
index ffd09a4..49f6166 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
@@ -26,4 +26,8 @@ public class InvalidRecordException extends CorruptRecordException {
super(s);
}
+ public InvalidRecordException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index b9c923d..3ff73c9 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
import org.junit.Test;
import java.io.DataOutputStream;
@@ -131,6 +132,89 @@ public class DefaultRecordTest {
record.headers()), logRecord.sizeInBytes());
}
+ @Test(expected = InvalidRecordException.class)
+ public void testInvalidKeySize() {
+ byte attributes = 0;
+ long timestampDelta = 2;
+ int offsetDelta = 1;
+ int sizeOfBodyInBytes = 100;
+ int keySize = 105; // use a key size larger than the full message
+
+ ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+ ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+ buf.put(attributes);
+ ByteUtils.writeVarlong(timestampDelta, buf);
+ ByteUtils.writeVarint(offsetDelta, buf);
+ ByteUtils.writeVarint(keySize, buf);
+ buf.position(buf.limit());
+
+ buf.flip();
+ DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testInvalidValueSize() throws IOException {
+ byte attributes = 0;
+ long timestampDelta = 2;
+ int offsetDelta = 1;
+ int sizeOfBodyInBytes = 100;
+ int valueSize = 105; // use a value size larger than the full message
+
+ ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+ ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+ buf.put(attributes);
+ ByteUtils.writeVarlong(timestampDelta, buf);
+ ByteUtils.writeVarint(offsetDelta, buf);
+ ByteUtils.writeVarint(-1, buf); // null key
+ ByteUtils.writeVarint(valueSize, buf);
+ buf.position(buf.limit());
+
+ buf.flip();
+ DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnderflowReadingTimestamp() {
+ byte attributes = 0;
+ int sizeOfBodyInBytes = 1;
+ ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+ ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+ buf.put(attributes);
+
+ buf.flip();
+ DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnderflowReadingVarlong() {
+ byte attributes = 0;
+ int sizeOfBodyInBytes = 2; // one byte for attributes, one byte for partial timestamp
+ ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 1);
+ ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+ buf.put(attributes);
+ ByteUtils.writeVarlong(156, buf); // needs 2 bytes to represent
+ buf.position(buf.limit() - 1);
+
+ buf.flip();
+ DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testInvalidVarlong() {
+ byte attributes = 0;
+ int sizeOfBodyInBytes = 11; // one byte for attributes, 10 bytes for max timestamp
+ ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 1);
+ ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+ int recordStartPosition = buf.position();
+
+ buf.put(attributes);
+ ByteUtils.writeVarlong(Long.MAX_VALUE, buf); // takes 10 bytes
+ buf.put(recordStartPosition + 10, Byte.MIN_VALUE); // use an invalid final byte
+
+ buf.flip();
+ DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+ }
+
@Test
public void testSerdeNoSequence() throws IOException {
ByteBuffer key = ByteBuffer.wrap("hi".getBytes());