You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/29 04:38:09 UTC

[kafka] branch trunk updated: KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 592410f  KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)
592410f is described below

commit 592410fc585ad72934fef2cef8624b8146fedf3d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue May 28 21:37:52 2019 -0700

    KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)
    
    I think it's better just to make single-batch as a universal requirement for all versions for compressed messages, and for V2 and beyond uncompressed messages as well.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/LogValidator.scala   | 163 ++++++++++++---------
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  65 +++++++-
 2 files changed, 156 insertions(+), 72 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 8bf215e..77a4b2b 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,6 +74,23 @@ private[kafka] object LogValidator extends Logging {
     }
   }
 
+  private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = {
+    // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
+    val batchIterator = records.batches.iterator
+
+    if (!batchIterator.hasNext) {
+      throw new InvalidRecordException("Compressed outer record has no batches at all")
+    }
+
+    val batch = batchIterator.next()
+
+    if (batchIterator.hasNext) {
+      throw new InvalidRecordException("Compressed outer record has more than one batch")
+    }
+
+    batch
+  }
+
   private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
     if (isFromClient) {
       if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
@@ -181,6 +198,10 @@ private[kafka] object LogValidator extends Logging {
     val initialOffset = offsetCounter.value
 
     for (batch <- records.batches.asScala) {
+      if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+        validateOneBatchRecords(records)
+      }
+
       validateBatch(batch, isFromClient, magic)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
@@ -249,83 +270,85 @@ private[kafka] object LogValidator extends Logging {
                                                  partitionLeaderEpoch: Int,
                                                  isFromClient: Boolean,
                                                  interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
-      // No in place assignment situation 1 and 2
-      var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
-
-      var maxTimestamp = RecordBatch.NO_TIMESTAMP
-      val expectedInnerOffset = new LongRef(0)
-      val validatedRecords = new mutable.ArrayBuffer[Record]
-
-      var uncompressedSizeInBytes = 0
-
-      for (batch <- records.batches.asScala) {
-        validateBatch(batch, isFromClient, toMagic)
-        uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
-
-        // Do not compress control records unless they are written compressed
-        if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
-          inPlaceAssignment = true
-
-        for (record <- batch.asScala) {
-          if (sourceCodec != NoCompressionCodec && record.isCompressed)
-            throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
-              s"compression attribute set: $record")
-          if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
-            throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
-          validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
-
-          uncompressedSizeInBytes += record.sizeInBytes()
-          if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
-            // Check if we need to overwrite offset
-            // No in place assignment situation 3
-            if (record.offset != expectedInnerOffset.getAndIncrement())
-              inPlaceAssignment = false
-            if (record.timestamp > maxTimestamp)
-              maxTimestamp = record.timestamp
-          }
-
-          // No in place assignment situation 4
-          if (!record.hasMagic(toMagic))
-            inPlaceAssignment = false
-
-          validatedRecords += record
-        }
+
+    // No in place assignment situation 1 and 2
+    var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
+
+    var maxTimestamp = RecordBatch.NO_TIMESTAMP
+    val expectedInnerOffset = new LongRef(0)
+    val validatedRecords = new mutable.ArrayBuffer[Record]
+
+    var uncompressedSizeInBytes = 0
+
+    val batch = validateOneBatchRecords(records)
+
+    validateBatch(batch, isFromClient, toMagic)
+    uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
+
+    // Do not compress control records unless they are written compressed
+    if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+      inPlaceAssignment = true
+
+    for (record <- batch.asScala) {
+      if (sourceCodec != NoCompressionCodec && record.isCompressed)
+        throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+          s"compression attribute set: $record")
+      if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
+        throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
+      validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+
+      uncompressedSizeInBytes += record.sizeInBytes()
+      if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
+        // Check if we need to overwrite offset
+        // No in place assignment situation 3
+        if (record.offset != expectedInnerOffset.getAndIncrement())
+          inPlaceAssignment = false
+        if (record.timestamp > maxTimestamp)
+          maxTimestamp = record.timestamp
       }
 
-      if (!inPlaceAssignment) {
-        val (producerId, producerEpoch, sequence, isTransactional) = {
-          // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
-          // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
-          // with older magic versions, there will never be a producer id, etc.
-          val first = records.batches.asScala.head
-          (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
-        }
-        buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
-          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
-          uncompressedSizeInBytes)
-      } else {
-        // we can update the batch only and write the compressed payload as is
-        val batch = records.batches.iterator.next()
-        val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
+      // No in place assignment situation 4
+      if (!record.hasMagic(toMagic))
+        inPlaceAssignment = false
 
-        batch.setLastOffset(lastOffset)
+      validatedRecords += record
+    }
 
-        if (timestampType == TimestampType.LOG_APPEND_TIME)
-          maxTimestamp = now
+    if (!inPlaceAssignment) {
+      val (producerId, producerEpoch, sequence, isTransactional) = {
+        // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
+        // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
+        // with older magic versions, there will never be a producer id, etc.
+        val first = records.batches.asScala.head
+        (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
+      }
+      buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+        validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+        uncompressedSizeInBytes)
+    } else {
+      // we can update the batch only and write the compressed payload as is;
+      // again we assume only one record batch within the compressed set
+      val batch = records.batches.iterator.next()
+      val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
 
-        if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
-          batch.setMaxTimestamp(timestampType, maxTimestamp)
+      batch.setLastOffset(lastOffset)
 
-        if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
-          batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+      if (timestampType == TimestampType.LOG_APPEND_TIME)
+        maxTimestamp = now
 
-        val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
-        ValidationAndOffsetAssignResult(validatedRecords = records,
-          maxTimestamp = maxTimestamp,
-          shallowOffsetOfMaxTimestamp = lastOffset,
-          messageSizeMaybeChanged = false,
-          recordConversionStats = recordConversionStats)
-      }
+      if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
+        batch.setMaxTimestamp(timestampType, maxTimestamp)
+
+      if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
+        batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+
+      val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
+      ValidationAndOffsetAssignResult(validatedRecords = records,
+        maxTimestamp = maxTimestamp,
+        shallowOffsetOfMaxTimestamp = lastOffset,
+        messageSizeMaybeChanged = false,
+        recordConversionStats = recordConversionStats)
+    }
   }
 
   private def buildRecordsAndAssignOffsets(magic: Byte,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 37553b9..d306b16 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 import java.nio.ByteBuffer
 import java.util.concurrent.TimeUnit
 
-import kafka.api.{ApiVersion, KAFKA_2_0_IV1}
+import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
 import kafka.common.LongRef
 import kafka.message._
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
-import org.scalatest.Assertions.intercept
+import org.scalatest.Assertions.{assertThrows, intercept}
 
 import scala.collection.JavaConverters._
 
@@ -37,6 +37,51 @@ class LogValidatorTest {
   val time = Time.SYSTEM
 
   @Test
+  def testOnlyOneBatchCompressedV0(): Unit = {
+    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
+  }
+
+  @Test
+  def testOnlyOneBatchCompressedV1(): Unit = {
+    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
+  }
+
+  @Test
+  def testOnlyOneBatchCompressedV2(): Unit = {
+    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP)
+  }
+
+  @Test
+  def testOnlyOneBatchUncompressedV2(): Unit = {
+    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE)
+  }
+
+  private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) {
+    validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType)
+
+    assertThrows[InvalidRecordException] {
+      validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType)
+    }
+  }
+
+  private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = {
+    LogValidator.validateMessagesAndAssignOffsets(records,
+      new LongRef(0L),
+      time,
+      now = 0L,
+      CompressionCodec.getCompressionCodec(compressionType.name),
+      CompressionCodec.getCompressionCodec(compressionType.name),
+      compactedTopic = false,
+      magic,
+      TimestampType.CREATE_TIME,
+      1000L,
+      RecordBatch.NO_PRODUCER_EPOCH,
+      isFromClient = true,
+      KAFKA_2_3_IV1
+    )
+  }
+
+  @Test
   def testLogAppendTimeNonCompressedV1() {
     checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
   }
@@ -1137,6 +1182,22 @@ class LogValidatorTest {
     builder.build()
   }
 
+  def createTwoBatchedRecords(magicValue: Byte,
+                              timestamp: Long = RecordBatch.NO_TIMESTAMP,
+                              codec: CompressionType): MemoryRecords = {
+    val buf = ByteBuffer.allocate(2048)
+    var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
+    builder.append(10L, "1".getBytes(), "a".getBytes())
+    builder.close()
+    builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L)
+    builder.append(11L, "2".getBytes(), "b".getBytes())
+    builder.append(12L, "3".getBytes(), "c".getBytes())
+    builder.close()
+
+    buf.flip()
+    MemoryRecords.readableRecords(buf.slice())
+  }
+
   /* check that offsets are assigned consecutively from the given base offset */
   def checkOffsets(records: MemoryRecords, baseOffset: Long) {
     assertTrue("Message set should not be empty", records.records.asScala.nonEmpty)