You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/03 23:56:44 UTC

[kafka] branch trunk updated: HOTFIX: Allow multi-batches for old format and no compression (#6871)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 573152d  HOTFIX: Allow multi-batches for old format and no compression (#6871)
573152d is described below

commit 573152dfa8087e40ee69b27fb9fb8f45d3825eb6
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Jun 3 16:56:28 2019 -0700

    HOTFIX: Allow multi-batches for old format and no compression (#6871)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/common/record/AbstractRecords.java       |  9 +++
 core/src/main/scala/kafka/log/LogValidator.scala   | 77 ++++++++++++----------
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 44 +++++++------
 3 files changed, 74 insertions(+), 56 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 1994a71..411e647 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -43,6 +43,15 @@ public abstract class AbstractRecords implements Records {
         return true;
     }
 
+    public boolean firstBatchHasCompatibleMagic(byte magic) {
+        Iterator<? extends RecordBatch> iterator = batches().iterator();
+
+        if (!iterator.hasNext())
+            return true;
+
+        return iterator.next().magic() <= magic;
+    }
+
     /**
      * Get an iterator over the deep records.
      * @return An iterator over the records
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 77a4b2b..d10eed8 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,21 +74,18 @@ private[kafka] object LogValidator extends Logging {
     }
   }
 
-  private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = {
-    // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
+  private[kafka] def validateOneBatchRecords(records: MemoryRecords) {
     val batchIterator = records.batches.iterator
 
     if (!batchIterator.hasNext) {
       throw new InvalidRecordException("Compressed outer record has no batches at all")
     }
 
-    val batch = batchIterator.next()
+    batchIterator.next()
 
     if (batchIterator.hasNext) {
       throw new InvalidRecordException("Compressed outer record has more than one batch")
     }
-
-    batch
   }
 
   private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
@@ -197,11 +194,12 @@ private[kafka] object LogValidator extends Logging {
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
 
-    for (batch <- records.batches.asScala) {
-      if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
-        validateOneBatchRecords(records)
-      }
+    if (!records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) {
+      // for v2 and beyond, we should check there's only one batch.
+      validateOneBatchRecords(records)
+    }
 
+    for (batch <- records.batches.asScala) {
       validateBatch(batch, isFromClient, magic)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
@@ -280,38 +278,47 @@ private[kafka] object LogValidator extends Logging {
 
     var uncompressedSizeInBytes = 0
 
-    val batch = validateOneBatchRecords(records)
+    // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
+    // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
+    // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
+    if (sourceCodec != NoCompressionCodec || !records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) {
+      validateOneBatchRecords(records)
+    }
+
+    val batches = records.batches.asScala
 
-    validateBatch(batch, isFromClient, toMagic)
-    uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
+    for (batch <- batches) {
+      validateBatch(batch, isFromClient, toMagic)
+      uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
 
-    // Do not compress control records unless they are written compressed
-    if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
-      inPlaceAssignment = true
+      // Do not compress control records unless they are written compressed
+      if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+        inPlaceAssignment = true
 
-    for (record <- batch.asScala) {
-      if (sourceCodec != NoCompressionCodec && record.isCompressed)
-        throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
-          s"compression attribute set: $record")
-      if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
-        throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
-      validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+      for (record <- batch.asScala) {
+        if (sourceCodec != NoCompressionCodec && record.isCompressed)
+          throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+            s"compression attribute set: $record")
+        if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
+          throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
+        validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
 
-      uncompressedSizeInBytes += record.sizeInBytes()
-      if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
-        // Check if we need to overwrite offset
-        // No in place assignment situation 3
-        if (record.offset != expectedInnerOffset.getAndIncrement())
-          inPlaceAssignment = false
-        if (record.timestamp > maxTimestamp)
-          maxTimestamp = record.timestamp
-      }
+        uncompressedSizeInBytes += record.sizeInBytes()
+        if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
+          // Check if we need to overwrite offset
+          // No in place assignment situation 3
+          if (record.offset != expectedInnerOffset.getAndIncrement())
+            inPlaceAssignment = false
+          if (record.timestamp > maxTimestamp)
+            maxTimestamp = record.timestamp
+        }
 
-      // No in place assignment situation 4
-      if (!record.hasMagic(toMagic))
-        inPlaceAssignment = false
+        // No in place assignment situation 4
+        if (!record.hasMagic(toMagic))
+          inPlaceAssignment = false
 
-      validatedRecords += record
+        validatedRecords += record
+      }
     }
 
     if (!inPlaceAssignment) {
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index d306b16..324314f 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -37,40 +37,42 @@ class LogValidatorTest {
   val time = Time.SYSTEM
 
   @Test
-  def testOnlyOneBatchCompressedV0(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
+  def testOnlyOneBatch(): Unit = {
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.GZIP)
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.GZIP)
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.NONE)
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.NONE)
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.NONE)
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.NONE)
+    checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.GZIP)
   }
 
   @Test
-  def testOnlyOneBatchCompressedV1(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
+  def testAllowMultiBatch(): Unit = {
+    checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.NONE)
+    checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.NONE)
+    checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.GZIP)
+    checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.GZIP)
   }
 
-  @Test
-  def testOnlyOneBatchCompressedV2(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP)
-  }
-
-  @Test
-  def testOnlyOneBatchUncompressedV2(): Unit = {
-    checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE)
-  }
-
-  private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) {
-    validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType)
-
+  private def checkOnlyOneBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) {
     assertThrows[InvalidRecordException] {
-      validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType)
+      validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
     }
   }
 
-  private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = {
+  private def checkAllowMultiBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) {
+    validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
+  }
+
+  private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = {
     LogValidator.validateMessagesAndAssignOffsets(records,
       new LongRef(0L),
       time,
       now = 0L,
-      CompressionCodec.getCompressionCodec(compressionType.name),
-      CompressionCodec.getCompressionCodec(compressionType.name),
+      CompressionCodec.getCompressionCodec(sourceCompressionType.name),
+      CompressionCodec.getCompressionCodec(targetCompressionType.name),
       compactedTopic = false,
       magic,
       TimestampType.CREATE_TIME,