You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/17 11:49:04 UTC

kafka git commit: KAFKA-5031; Follow-up with small cleanups/improvements

Repository: kafka
Updated Branches:
  refs/heads/trunk 3fdbba1c7 -> 51fc50ed0


KAFKA-5031; Follow-up with small cleanups/improvements

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3363 from hachikuji/KAFKA-5031-FOLLOWUP


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51fc50ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51fc50ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51fc50ed

Branch: refs/heads/trunk
Commit: 51fc50ed0ba7aa4d0e618f9103a1531680489139
Parents: 3fdbba1
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Jun 17 12:48:34 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Jun 17 12:48:38 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/record/DefaultRecord.java      | 25 +++---
 .../kafka/common/record/DefaultRecordBatch.java |  2 +-
 .../common/record/InvalidRecordException.java   |  4 +
 .../kafka/common/record/DefaultRecordTest.java  | 84 ++++++++++++++++++++
 4 files changed, 104 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 9b7f327..109528f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -293,7 +293,8 @@ public class DefaultRecord implements Record {
         ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
         input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
-        return readFrom(recordBuffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+        return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+                baseSequence, logAppendTime);
     }
 
     public static DefaultRecord readFrom(ByteBuffer buffer,
@@ -306,11 +307,13 @@ public class DefaultRecord implements Record {
             return null;
 
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
-        return readFrom(buffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+        return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+                baseSequence, logAppendTime);
     }
 
     private static DefaultRecord readFrom(ByteBuffer buffer,
                                           int sizeInBytes,
+                                          int sizeOfBodyInBytes,
                                           long baseOffset,
                                           long baseTimestamp,
                                           int baseSequence,
@@ -349,19 +352,24 @@ public class DefaultRecord implements Record {
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
 
+            final Header[] headers;
             if (numHeaders == 0)
-                return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, Record.EMPTY_HEADERS);
+                headers = Record.EMPTY_HEADERS;
+            else
+                headers = readHeaders(buffer, numHeaders);
 
-            Header[] headers = readHeaders(buffer, numHeaders, recordStart, sizeInBytes);
+            // validate whether we have read all header bytes in the current record
+            if (buffer.position() - recordStart != sizeOfBodyInBytes)
+                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
+                        " bytes in record payload, but instead read " + (buffer.position() - recordStart));
 
             return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
-            throw new InvalidRecordException("Invalid header data or number of headers declared for the record, reason for failure was "
-                    + e.getMessage());
+            throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    private static Header[] readHeaders(ByteBuffer buffer, int numHeaders, int recordStart, int sizeInBytes) {
+    private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) {
         Header[] headers = new Header[numHeaders];
         for (int i = 0; i < numHeaders; i++) {
             int headerKeySize = ByteUtils.readVarint(buffer);
@@ -382,9 +390,6 @@ public class DefaultRecord implements Record {
             headers[i] = new RecordHeader(headerKey, headerValue);
         }
 
-        // validate whether we have read all header bytes in the current record
-        if (buffer.position() - recordStart != sizeInBytes - ByteUtils.sizeOfVarint(sizeInBytes))
-            throw new InvalidRecordException("Invalid header data or number of headers declared for the record");
         return headers;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 5a7e27a..f933f41 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -259,7 +259,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
                 try {
                     return inputStream.read() == -1;
                 } catch (IOException e) {
-                    return false;
+                    throw new KafkaException("Error checking for remaining bytes after reading batch", e);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
index ffd09a4..49f6166 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
@@ -26,4 +26,8 @@ public class InvalidRecordException extends CorruptRecordException {
         super(s);
     }
 
+    public InvalidRecordException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index b9c923d..3ff73c9 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
 import org.junit.Test;
 
 import java.io.DataOutputStream;
@@ -131,6 +132,89 @@ public class DefaultRecordTest {
                 record.headers()), logRecord.sizeInBytes());
     }
 
+    @Test(expected = InvalidRecordException.class)
+    public void testInvalidKeySize() {
+        byte attributes = 0;
+        long timestampDelta = 2;
+        int offsetDelta = 1;
+        int sizeOfBodyInBytes = 100;
+        int keySize = 105; // use a key size larger than the full message
+
+        ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+        buf.put(attributes);
+        ByteUtils.writeVarlong(timestampDelta, buf);
+        ByteUtils.writeVarint(offsetDelta, buf);
+        ByteUtils.writeVarint(keySize, buf);
+        buf.position(buf.limit());
+
+        buf.flip();
+        DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testInvalidValueSize() throws IOException {
+        byte attributes = 0;
+        long timestampDelta = 2;
+        int offsetDelta = 1;
+        int sizeOfBodyInBytes = 100;
+        int valueSize = 105; // use a value size larger than the full message
+
+        ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+        buf.put(attributes);
+        ByteUtils.writeVarlong(timestampDelta, buf);
+        ByteUtils.writeVarint(offsetDelta, buf);
+        ByteUtils.writeVarint(-1, buf); // null key
+        ByteUtils.writeVarint(valueSize, buf);
+        buf.position(buf.limit());
+
+        buf.flip();
+        DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testUnderflowReadingTimestamp() {
+        byte attributes = 0;
+        int sizeOfBodyInBytes = 1;
+        ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+        buf.put(attributes);
+
+        buf.flip();
+        DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testUnderflowReadingVarlong() {
+        byte attributes = 0;
+        int sizeOfBodyInBytes = 2; // one byte for attributes, one byte for partial timestamp
+        ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 1);
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+        buf.put(attributes);
+        ByteUtils.writeVarlong(156, buf); // needs 2 bytes to represent
+        buf.position(buf.limit() - 1);
+
+        buf.flip();
+        DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testInvalidVarlong() {
+        byte attributes = 0;
+        int sizeOfBodyInBytes = 11; // one byte for attributes, 10 bytes for max timestamp
+        ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 1);
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+        int recordStartPosition = buf.position();
+
+        buf.put(attributes);
+        ByteUtils.writeVarlong(Long.MAX_VALUE, buf); // takes 10 bytes
+        buf.put(recordStartPosition + 10, Byte.MIN_VALUE); // use an invalid final byte
+
+        buf.flip();
+        DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
+    }
+
     @Test
     public void testSerdeNoSequence() throws IOException {
         ByteBuffer key = ByteBuffer.wrap("hi".getBytes());