You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/20 09:32:13 UTC

[kafka] branch trunk updated: KAFKA-6554; Missing lastOffsetDelta validation before log append (#4585)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1547cf6  KAFKA-6554; Missing lastOffsetDelta validation before log append (#4585)
1547cf6 is described below

commit 1547cf6de86bbb8f7bacc26ab9ab89d4c9ec5566
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Feb 20 01:32:09 2018 -0800

    KAFKA-6554; Missing lastOffsetDelta validation before log append (#4585)
    
    Add validation checks that the offset range is valid and aligned with the batch count prior to appending to the log. Several unit tests have been added to verify the various invalid cases.
---
 .../kafka/common/record/DefaultRecordBatch.java    |  4 +-
 core/src/main/scala/kafka/log/LogValidator.scala   | 18 +++++++--
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 45 ++++++++++++++++++++++
 3 files changed, 62 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index ff8d3b9..71e668e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -106,7 +106,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     static final int CRC_LENGTH = 4;
     static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
     static final int ATTRIBUTE_LENGTH = 2;
-    static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
     static final int LAST_OFFSET_DELTA_LENGTH = 4;
     static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
     static final int FIRST_TIMESTAMP_LENGTH = 8;
@@ -118,7 +118,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     static final int PRODUCER_EPOCH_LENGTH = 2;
     static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH;
     static final int BASE_SEQUENCE_LENGTH = 4;
-    static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
+    public static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
     static final int RECORDS_COUNT_LENGTH = 4;
     static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
     public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET;
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 15750e9..1beb2bd 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,15 +74,27 @@ private[kafka] object LogValidator extends Logging {
 
   private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
     if (isFromClient) {
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+        val countFromOffsets = batch.lastOffset - batch.baseOffset + 1
+        if (countFromOffsets <= 0)
+          throw new InvalidRecordException(s"Batch has an invalid offset range: [${batch.baseOffset}, ${batch.lastOffset}]")
+
+        // v2 and above messages always have a non-null count
+        val count = batch.countOrNull
+        if (count <= 0)
+          throw new InvalidRecordException(s"Invalid reported count for record batch: $count")
+
+        if (countFromOffsets != batch.countOrNull)
+          throw new InvalidRecordException(s"Inconsistent batch offset range [${batch.baseOffset}, ${batch.lastOffset}] " +
+            s"and count of records $count")
+      }
+
       if (batch.hasProducerId && batch.baseSequence < 0)
         throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " +
           s"with producerId ${batch.producerId}")
 
       if (batch.isControlBatch)
         throw new InvalidRecordException("Clients are not allowed to write control records")
-
-      if (Option(batch.countOrNull).contains(0))
-        throw new InvalidRecordException("Record batches must contain at least one record")
     }
 
     if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 131152a..04e89528 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
+import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 
@@ -148,6 +149,50 @@ class LogValidatorTest {
   }
 
   @Test
+  def testInvalidOffsetRangeAndRecordCount(): Unit = {
+    // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2
+    validateRecordBatchWithCountOverrides(lastOffsetDelta = 2, count = 3)
+
+    // Count and offset range are inconsistent or invalid
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 0, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 15, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = -3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 6)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 0)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = -2)
+
+    // Count and offset range are consistent, but do not match the actual number of records
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 5, count = 6)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 1, count = 2)
+  }
+
+  private def assertInvalidBatchCountOverrides(lastOffsetDelta: Int, count: Int): Unit = {
+    intercept[InvalidRecordException] {
+      validateRecordBatchWithCountOverrides(lastOffsetDelta, count)
+    }
+  }
+
+  private def validateRecordBatchWithCountOverrides(lastOffsetDelta: Int, count: Int) {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L, codec = CompressionType.NONE)
+    records.buffer.putInt(DefaultRecordBatch.RECORDS_COUNT_OFFSET, count)
+    records.buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta)
+    LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      time = time,
+      now = time.milliseconds(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      isFromClient = true)
+  }
+
+  @Test
   def testLogAppendTimeWithoutRecompressionV2() {
     checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2)
   }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.