You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/03 23:56:44 UTC
[kafka] branch trunk updated: HOTFIX: Allow multi-batches for old
format and no compression (#6871)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 573152d HOTFIX: Allow multi-batches for old format and no compression (#6871)
573152d is described below
commit 573152dfa8087e40ee69b27fb9fb8f45d3825eb6
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Jun 3 16:56:28 2019 -0700
HOTFIX: Allow multi-batches for old format and no compression (#6871)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/common/record/AbstractRecords.java | 9 +++
core/src/main/scala/kafka/log/LogValidator.scala | 77 ++++++++++++----------
.../scala/unit/kafka/log/LogValidatorTest.scala | 44 +++++++------
3 files changed, 74 insertions(+), 56 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 1994a71..411e647 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -43,6 +43,15 @@ public abstract class AbstractRecords implements Records {
return true;
}
+ public boolean firstBatchHasCompatibleMagic(byte magic) {
+ Iterator<? extends RecordBatch> iterator = batches().iterator();
+
+ if (!iterator.hasNext())
+ return true;
+
+ return iterator.next().magic() <= magic;
+ }
+
/**
* Get an iterator over the deep records.
* @return An iterator over the records
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 77a4b2b..d10eed8 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,21 +74,18 @@ private[kafka] object LogValidator extends Logging {
}
}
- private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = {
- // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
+ private[kafka] def validateOneBatchRecords(records: MemoryRecords) {
val batchIterator = records.batches.iterator
if (!batchIterator.hasNext) {
throw new InvalidRecordException("Compressed outer record has no batches at all")
}
- val batch = batchIterator.next()
+ batchIterator.next()
if (batchIterator.hasNext) {
throw new InvalidRecordException("Compressed outer record has more than one batch")
}
-
- batch
}
private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
@@ -197,11 +194,12 @@ private[kafka] object LogValidator extends Logging {
var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value
- for (batch <- records.batches.asScala) {
- if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
- validateOneBatchRecords(records)
- }
+ if (!records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) {
+ // for v2 and beyond, we should check there's only one batch.
+ validateOneBatchRecords(records)
+ }
+ for (batch <- records.batches.asScala) {
validateBatch(batch, isFromClient, magic)
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
@@ -280,38 +278,47 @@ private[kafka] object LogValidator extends Logging {
var uncompressedSizeInBytes = 0
- val batch = validateOneBatchRecords(records)
+ // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
+ // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
+ // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
+ if (sourceCodec != NoCompressionCodec || !records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) {
+ validateOneBatchRecords(records)
+ }
+
+ val batches = records.batches.asScala
- validateBatch(batch, isFromClient, toMagic)
- uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
+ for (batch <- batches) {
+ validateBatch(batch, isFromClient, toMagic)
+ uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
- // Do not compress control records unless they are written compressed
- if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
- inPlaceAssignment = true
+ // Do not compress control records unless they are written compressed
+ if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+ inPlaceAssignment = true
- for (record <- batch.asScala) {
- if (sourceCodec != NoCompressionCodec && record.isCompressed)
- throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
- s"compression attribute set: $record")
- if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
- throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
- validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+ for (record <- batch.asScala) {
+ if (sourceCodec != NoCompressionCodec && record.isCompressed)
+ throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+ s"compression attribute set: $record")
+ if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
+ throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
+ validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
- uncompressedSizeInBytes += record.sizeInBytes()
- if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
- // Check if we need to overwrite offset
- // No in place assignment situation 3
- if (record.offset != expectedInnerOffset.getAndIncrement())
- inPlaceAssignment = false
- if (record.timestamp > maxTimestamp)
- maxTimestamp = record.timestamp
- }
+ uncompressedSizeInBytes += record.sizeInBytes()
+ if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
+ // Check if we need to overwrite offset
+ // No in place assignment situation 3
+ if (record.offset != expectedInnerOffset.getAndIncrement())
+ inPlaceAssignment = false
+ if (record.timestamp > maxTimestamp)
+ maxTimestamp = record.timestamp
+ }
- // No in place assignment situation 4
- if (!record.hasMagic(toMagic))
- inPlaceAssignment = false
+ // No in place assignment situation 4
+ if (!record.hasMagic(toMagic))
+ inPlaceAssignment = false
- validatedRecords += record
+ validatedRecords += record
+ }
}
if (!inPlaceAssignment) {
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index d306b16..324314f 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -37,40 +37,42 @@ class LogValidatorTest {
val time = Time.SYSTEM
@Test
- def testOnlyOneBatchCompressedV0(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
+ def testOnlyOneBatch(): Unit = {
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.GZIP)
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.GZIP)
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.NONE)
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.NONE)
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.NONE)
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.NONE)
+ checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.GZIP)
}
@Test
- def testOnlyOneBatchCompressedV1(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
+ def testAllowMultiBatch(): Unit = {
+ checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.NONE)
+ checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.NONE)
+ checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.GZIP)
+ checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.GZIP)
}
- @Test
- def testOnlyOneBatchCompressedV2(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP)
- }
-
- @Test
- def testOnlyOneBatchUncompressedV2(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE)
- }
-
- private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) {
- validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType)
-
+ private def checkOnlyOneBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) {
assertThrows[InvalidRecordException] {
- validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType)
+ validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
}
}
- private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = {
+ private def checkAllowMultiBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) {
+ validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
+ }
+
+ private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = {
LogValidator.validateMessagesAndAssignOffsets(records,
new LongRef(0L),
time,
now = 0L,
- CompressionCodec.getCompressionCodec(compressionType.name),
- CompressionCodec.getCompressionCodec(compressionType.name),
+ CompressionCodec.getCompressionCodec(sourceCompressionType.name),
+ CompressionCodec.getCompressionCodec(targetCompressionType.name),
compactedTopic = false,
magic,
TimestampType.CREATE_TIME,