You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/29 04:38:09 UTC
[kafka] branch trunk updated: KAFKA-8428: Always require a single
batch with un- / compressed messages (#6816)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 592410f KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)
592410f is described below
commit 592410fc585ad72934fef2cef8624b8146fedf3d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue May 28 21:37:52 2019 -0700
KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)
I think it's better just to make single-batch as a universal requirement for all versions for compressed messages, and for V2 and beyond uncompressed messages as well.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/log/LogValidator.scala | 163 ++++++++++++---------
.../scala/unit/kafka/log/LogValidatorTest.scala | 65 +++++++-
2 files changed, 156 insertions(+), 72 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 8bf215e..77a4b2b 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,6 +74,23 @@ private[kafka] object LogValidator extends Logging {
}
}
+ private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = {
+ // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
+ val batchIterator = records.batches.iterator
+
+ if (!batchIterator.hasNext) {
+ throw new InvalidRecordException("Compressed outer record has no batches at all")
+ }
+
+ val batch = batchIterator.next()
+
+ if (batchIterator.hasNext) {
+ throw new InvalidRecordException("Compressed outer record has more than one batch")
+ }
+
+ batch
+ }
+
private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
if (isFromClient) {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
@@ -181,6 +198,10 @@ private[kafka] object LogValidator extends Logging {
val initialOffset = offsetCounter.value
for (batch <- records.batches.asScala) {
+ if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+ validateOneBatchRecords(records)
+ }
+
validateBatch(batch, isFromClient, magic)
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
@@ -249,83 +270,85 @@ private[kafka] object LogValidator extends Logging {
partitionLeaderEpoch: Int,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
- // No in place assignment situation 1 and 2
- var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
-
- var maxTimestamp = RecordBatch.NO_TIMESTAMP
- val expectedInnerOffset = new LongRef(0)
- val validatedRecords = new mutable.ArrayBuffer[Record]
-
- var uncompressedSizeInBytes = 0
-
- for (batch <- records.batches.asScala) {
- validateBatch(batch, isFromClient, toMagic)
- uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
-
- // Do not compress control records unless they are written compressed
- if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
- inPlaceAssignment = true
-
- for (record <- batch.asScala) {
- if (sourceCodec != NoCompressionCodec && record.isCompressed)
- throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
- s"compression attribute set: $record")
- if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
- throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
- validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
-
- uncompressedSizeInBytes += record.sizeInBytes()
- if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
- // Check if we need to overwrite offset
- // No in place assignment situation 3
- if (record.offset != expectedInnerOffset.getAndIncrement())
- inPlaceAssignment = false
- if (record.timestamp > maxTimestamp)
- maxTimestamp = record.timestamp
- }
-
- // No in place assignment situation 4
- if (!record.hasMagic(toMagic))
- inPlaceAssignment = false
-
- validatedRecords += record
- }
+
+ // No in place assignment situation 1 and 2
+ var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
+
+ var maxTimestamp = RecordBatch.NO_TIMESTAMP
+ val expectedInnerOffset = new LongRef(0)
+ val validatedRecords = new mutable.ArrayBuffer[Record]
+
+ var uncompressedSizeInBytes = 0
+
+ val batch = validateOneBatchRecords(records)
+
+ validateBatch(batch, isFromClient, toMagic)
+ uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
+
+ // Do not compress control records unless they are written compressed
+ if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+ inPlaceAssignment = true
+
+ for (record <- batch.asScala) {
+ if (sourceCodec != NoCompressionCodec && record.isCompressed)
+ throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+ s"compression attribute set: $record")
+ if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
+ throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
+ validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+
+ uncompressedSizeInBytes += record.sizeInBytes()
+ if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
+ // Check if we need to overwrite offset
+ // No in place assignment situation 3
+ if (record.offset != expectedInnerOffset.getAndIncrement())
+ inPlaceAssignment = false
+ if (record.timestamp > maxTimestamp)
+ maxTimestamp = record.timestamp
}
- if (!inPlaceAssignment) {
- val (producerId, producerEpoch, sequence, isTransactional) = {
- // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
- // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
- // with older magic versions, there will never be a producer id, etc.
- val first = records.batches.asScala.head
- (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
- }
- buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
- validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
- uncompressedSizeInBytes)
- } else {
- // we can update the batch only and write the compressed payload as is
- val batch = records.batches.iterator.next()
- val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
+ // No in place assignment situation 4
+ if (!record.hasMagic(toMagic))
+ inPlaceAssignment = false
- batch.setLastOffset(lastOffset)
+ validatedRecords += record
+ }
- if (timestampType == TimestampType.LOG_APPEND_TIME)
- maxTimestamp = now
+ if (!inPlaceAssignment) {
+ val (producerId, producerEpoch, sequence, isTransactional) = {
+ // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
+ // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
+ // with older magic versions, there will never be a producer id, etc.
+ val first = records.batches.asScala.head
+ (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
+ }
+ buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+ validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+ uncompressedSizeInBytes)
+ } else {
+ // we can update the batch only and write the compressed payload as is;
+ // again we assume only one record batch within the compressed set
+ val batch = records.batches.iterator.next()
+ val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
- if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
- batch.setMaxTimestamp(timestampType, maxTimestamp)
+ batch.setLastOffset(lastOffset)
- if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
- batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ maxTimestamp = now
- val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
- ValidationAndOffsetAssignResult(validatedRecords = records,
- maxTimestamp = maxTimestamp,
- shallowOffsetOfMaxTimestamp = lastOffset,
- messageSizeMaybeChanged = false,
- recordConversionStats = recordConversionStats)
- }
+ if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
+ batch.setMaxTimestamp(timestampType, maxTimestamp)
+
+ if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
+ batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+
+ val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
+ ValidationAndOffsetAssignResult(validatedRecords = records,
+ maxTimestamp = maxTimestamp,
+ shallowOffsetOfMaxTimestamp = lastOffset,
+ messageSizeMaybeChanged = false,
+ recordConversionStats = recordConversionStats)
+ }
}
private def buildRecordsAndAssignOffsets(magic: Byte,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 37553b9..d306b16 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -19,7 +19,7 @@ package kafka.log
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
-import kafka.api.{ApiVersion, KAFKA_2_0_IV1}
+import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
import kafka.common.LongRef
import kafka.message._
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.test.TestUtils
import org.junit.Assert._
import org.junit.Test
-import org.scalatest.Assertions.intercept
+import org.scalatest.Assertions.{assertThrows, intercept}
import scala.collection.JavaConverters._
@@ -37,6 +37,51 @@ class LogValidatorTest {
val time = Time.SYSTEM
@Test
+ def testOnlyOneBatchCompressedV0(): Unit = {
+ checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
+ }
+
+ @Test
+ def testOnlyOneBatchCompressedV1(): Unit = {
+ checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
+ }
+
+ @Test
+ def testOnlyOneBatchCompressedV2(): Unit = {
+ checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP)
+ }
+
+ @Test
+ def testOnlyOneBatchUncompressedV2(): Unit = {
+ checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE)
+ }
+
+ private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) {
+ validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType)
+
+ assertThrows[InvalidRecordException] {
+ validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType)
+ }
+ }
+
+ private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = {
+ LogValidator.validateMessagesAndAssignOffsets(records,
+ new LongRef(0L),
+ time,
+ now = 0L,
+ CompressionCodec.getCompressionCodec(compressionType.name),
+ CompressionCodec.getCompressionCodec(compressionType.name),
+ compactedTopic = false,
+ magic,
+ TimestampType.CREATE_TIME,
+ 1000L,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ isFromClient = true,
+ KAFKA_2_3_IV1
+ )
+ }
+
+ @Test
def testLogAppendTimeNonCompressedV1() {
checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
}
@@ -1137,6 +1182,22 @@ class LogValidatorTest {
builder.build()
}
+ def createTwoBatchedRecords(magicValue: Byte,
+ timestamp: Long = RecordBatch.NO_TIMESTAMP,
+ codec: CompressionType): MemoryRecords = {
+ val buf = ByteBuffer.allocate(2048)
+ var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
+ builder.append(10L, "1".getBytes(), "a".getBytes())
+ builder.close()
+ builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L)
+ builder.append(11L, "2".getBytes(), "b".getBytes())
+ builder.append(12L, "3".getBytes(), "c".getBytes())
+ builder.close()
+
+ buf.flip()
+ MemoryRecords.readableRecords(buf.slice())
+ }
+
/* check that offsets are assigned consecutively from the given base offset */
def checkOffsets(records: MemoryRecords, baseOffset: Long) {
assertTrue("Message set should not be empty", records.records.asScala.nonEmpty)