You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/31 21:50:58 UTC

[kafka] branch 2.3 updated: Revert "KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)"

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 0701a79  Revert "KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)"
0701a79 is described below

commit 0701a79bdb5dc1033bc0d264f1673696115c7ac4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 31 14:21:59 2019 -0700

    Revert "KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)"
    
    This reverts commit 6d5c7c3e375b38e54746af040335761b6f69f70e.
---
 core/src/main/scala/kafka/log/LogValidator.scala   | 163 +++++++++------------
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  65 +-------
 2 files changed, 72 insertions(+), 156 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 77a4b2b..8bf215e 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,23 +74,6 @@ private[kafka] object LogValidator extends Logging {
     }
   }
 
-  private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = {
-    // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
-    val batchIterator = records.batches.iterator
-
-    if (!batchIterator.hasNext) {
-      throw new InvalidRecordException("Compressed outer record has no batches at all")
-    }
-
-    val batch = batchIterator.next()
-
-    if (batchIterator.hasNext) {
-      throw new InvalidRecordException("Compressed outer record has more than one batch")
-    }
-
-    batch
-  }
-
   private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
     if (isFromClient) {
       if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
@@ -198,10 +181,6 @@ private[kafka] object LogValidator extends Logging {
     val initialOffset = offsetCounter.value
 
     for (batch <- records.batches.asScala) {
-      if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
-        validateOneBatchRecords(records)
-      }
-
       validateBatch(batch, isFromClient, magic)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
@@ -270,85 +249,83 @@ private[kafka] object LogValidator extends Logging {
                                                  partitionLeaderEpoch: Int,
                                                  isFromClient: Boolean,
                                                  interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
-
-    // No in place assignment situation 1 and 2
-    var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
-
-    var maxTimestamp = RecordBatch.NO_TIMESTAMP
-    val expectedInnerOffset = new LongRef(0)
-    val validatedRecords = new mutable.ArrayBuffer[Record]
-
-    var uncompressedSizeInBytes = 0
-
-    val batch = validateOneBatchRecords(records)
-
-    validateBatch(batch, isFromClient, toMagic)
-    uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
-
-    // Do not compress control records unless they are written compressed
-    if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
-      inPlaceAssignment = true
-
-    for (record <- batch.asScala) {
-      if (sourceCodec != NoCompressionCodec && record.isCompressed)
-        throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
-          s"compression attribute set: $record")
-      if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
-        throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
-      validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
-
-      uncompressedSizeInBytes += record.sizeInBytes()
-      if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
-        // Check if we need to overwrite offset
-        // No in place assignment situation 3
-        if (record.offset != expectedInnerOffset.getAndIncrement())
-          inPlaceAssignment = false
-        if (record.timestamp > maxTimestamp)
-          maxTimestamp = record.timestamp
+      // No in place assignment situation 1 and 2
+      var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
+
+      var maxTimestamp = RecordBatch.NO_TIMESTAMP
+      val expectedInnerOffset = new LongRef(0)
+      val validatedRecords = new mutable.ArrayBuffer[Record]
+
+      var uncompressedSizeInBytes = 0
+
+      for (batch <- records.batches.asScala) {
+        validateBatch(batch, isFromClient, toMagic)
+        uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
+
+        // Do not compress control records unless they are written compressed
+        if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+          inPlaceAssignment = true
+
+        for (record <- batch.asScala) {
+          if (sourceCodec != NoCompressionCodec && record.isCompressed)
+            throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+              s"compression attribute set: $record")
+          if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
+            throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
+          validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+
+          uncompressedSizeInBytes += record.sizeInBytes()
+          if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
+            // Check if we need to overwrite offset
+            // No in place assignment situation 3
+            if (record.offset != expectedInnerOffset.getAndIncrement())
+              inPlaceAssignment = false
+            if (record.timestamp > maxTimestamp)
+              maxTimestamp = record.timestamp
+          }
+
+          // No in place assignment situation 4
+          if (!record.hasMagic(toMagic))
+            inPlaceAssignment = false
+
+          validatedRecords += record
+        }
       }
 
-      // No in place assignment situation 4
-      if (!record.hasMagic(toMagic))
-        inPlaceAssignment = false
-
-      validatedRecords += record
-    }
-
-    if (!inPlaceAssignment) {
-      val (producerId, producerEpoch, sequence, isTransactional) = {
-        // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
-        // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
-        // with older magic versions, there will never be a producer id, etc.
-        val first = records.batches.asScala.head
-        (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
-      }
-      buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
-        validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
-        uncompressedSizeInBytes)
-    } else {
-      // we can update the batch only and write the compressed payload as is;
-      // again we assume only one record batch within the compressed set
-      val batch = records.batches.iterator.next()
-      val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
+      if (!inPlaceAssignment) {
+        val (producerId, producerEpoch, sequence, isTransactional) = {
+          // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
+          // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
+          // with older magic versions, there will never be a producer id, etc.
+          val first = records.batches.asScala.head
+          (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
+        }
+        buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+          uncompressedSizeInBytes)
+      } else {
+        // we can update the batch only and write the compressed payload as is
+        val batch = records.batches.iterator.next()
+        val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
 
-      batch.setLastOffset(lastOffset)
+        batch.setLastOffset(lastOffset)
 
-      if (timestampType == TimestampType.LOG_APPEND_TIME)
-        maxTimestamp = now
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+          maxTimestamp = now
 
-      if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
-        batch.setMaxTimestamp(timestampType, maxTimestamp)
+        if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
+          batch.setMaxTimestamp(timestampType, maxTimestamp)
 
-      if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
-        batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+        if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
+          batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
 
-      val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
-      ValidationAndOffsetAssignResult(validatedRecords = records,
-        maxTimestamp = maxTimestamp,
-        shallowOffsetOfMaxTimestamp = lastOffset,
-        messageSizeMaybeChanged = false,
-        recordConversionStats = recordConversionStats)
-    }
+        val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
+        ValidationAndOffsetAssignResult(validatedRecords = records,
+          maxTimestamp = maxTimestamp,
+          shallowOffsetOfMaxTimestamp = lastOffset,
+          messageSizeMaybeChanged = false,
+          recordConversionStats = recordConversionStats)
+      }
   }
 
   private def buildRecordsAndAssignOffsets(magic: Byte,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index d306b16..37553b9 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 import java.nio.ByteBuffer
 import java.util.concurrent.TimeUnit
 
-import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
+import kafka.api.{ApiVersion, KAFKA_2_0_IV1}
 import kafka.common.LongRef
 import kafka.message._
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
-import org.scalatest.Assertions.{assertThrows, intercept}
+import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 
@@ -37,51 +37,6 @@ class LogValidatorTest {
   val time = Time.SYSTEM
 
   @Test
-  def testOnlyOneBatchCompressedV0(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
-  }
-
-  @Test
-  def testOnlyOneBatchCompressedV1(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
-  }
-
-  @Test
-  def testOnlyOneBatchCompressedV2(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP)
-  }
-
-  @Test
-  def testOnlyOneBatchUncompressedV2(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE)
-  }
-
-  private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) {
-    validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType)
-
-    assertThrows[InvalidRecordException] {
-      validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType)
-    }
-  }
-
-  private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = {
-    LogValidator.validateMessagesAndAssignOffsets(records,
-      new LongRef(0L),
-      time,
-      now = 0L,
-      CompressionCodec.getCompressionCodec(compressionType.name),
-      CompressionCodec.getCompressionCodec(compressionType.name),
-      compactedTopic = false,
-      magic,
-      TimestampType.CREATE_TIME,
-      1000L,
-      RecordBatch.NO_PRODUCER_EPOCH,
-      isFromClient = true,
-      KAFKA_2_3_IV1
-    )
-  }
-
-  @Test
   def testLogAppendTimeNonCompressedV1() {
     checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
   }
@@ -1182,22 +1137,6 @@ class LogValidatorTest {
     builder.build()
   }
 
-  def createTwoBatchedRecords(magicValue: Byte,
-                              timestamp: Long = RecordBatch.NO_TIMESTAMP,
-                              codec: CompressionType): MemoryRecords = {
-    val buf = ByteBuffer.allocate(2048)
-    var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
-    builder.append(10L, "1".getBytes(), "a".getBytes())
-    builder.close()
-    builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L)
-    builder.append(11L, "2".getBytes(), "b".getBytes())
-    builder.append(12L, "3".getBytes(), "c".getBytes())
-    builder.close()
-
-    buf.flip()
-    MemoryRecords.readableRecords(buf.slice())
-  }
-
   /* check that offsets are assigned consecutively from the given base offset */
   def checkOffsets(records: MemoryRecords, baseOffset: Long) {
     assertTrue("Message set should not be empty", records.records.asScala.nonEmpty)