You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/31 21:50:58 UTC
[kafka] branch 2.3 updated: Revert "KAFKA-8428: Always require a
single batch with un- / compressed messages (#6816)"
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 0701a79 Revert "KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)"
0701a79 is described below
commit 0701a79bdb5dc1033bc0d264f1673696115c7ac4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 31 14:21:59 2019 -0700
Revert "KAFKA-8428: Always require a single batch with un- / compressed messages (#6816)"
This reverts commit 6d5c7c3e375b38e54746af040335761b6f69f70e.
---
core/src/main/scala/kafka/log/LogValidator.scala | 163 +++++++++------------
.../scala/unit/kafka/log/LogValidatorTest.scala | 65 +-------
2 files changed, 72 insertions(+), 156 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 77a4b2b..8bf215e 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,23 +74,6 @@ private[kafka] object LogValidator extends Logging {
}
}
- private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = {
- // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
- val batchIterator = records.batches.iterator
-
- if (!batchIterator.hasNext) {
- throw new InvalidRecordException("Compressed outer record has no batches at all")
- }
-
- val batch = batchIterator.next()
-
- if (batchIterator.hasNext) {
- throw new InvalidRecordException("Compressed outer record has more than one batch")
- }
-
- batch
- }
-
private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
if (isFromClient) {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
@@ -198,10 +181,6 @@ private[kafka] object LogValidator extends Logging {
val initialOffset = offsetCounter.value
for (batch <- records.batches.asScala) {
- if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
- validateOneBatchRecords(records)
- }
-
validateBatch(batch, isFromClient, magic)
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
@@ -270,85 +249,83 @@ private[kafka] object LogValidator extends Logging {
partitionLeaderEpoch: Int,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
-
- // No in place assignment situation 1 and 2
- var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
-
- var maxTimestamp = RecordBatch.NO_TIMESTAMP
- val expectedInnerOffset = new LongRef(0)
- val validatedRecords = new mutable.ArrayBuffer[Record]
-
- var uncompressedSizeInBytes = 0
-
- val batch = validateOneBatchRecords(records)
-
- validateBatch(batch, isFromClient, toMagic)
- uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
-
- // Do not compress control records unless they are written compressed
- if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
- inPlaceAssignment = true
-
- for (record <- batch.asScala) {
- if (sourceCodec != NoCompressionCodec && record.isCompressed)
- throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
- s"compression attribute set: $record")
- if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
- throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
- validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
-
- uncompressedSizeInBytes += record.sizeInBytes()
- if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
- // Check if we need to overwrite offset
- // No in place assignment situation 3
- if (record.offset != expectedInnerOffset.getAndIncrement())
- inPlaceAssignment = false
- if (record.timestamp > maxTimestamp)
- maxTimestamp = record.timestamp
+ // No in place assignment situation 1 and 2
+ var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
+
+ var maxTimestamp = RecordBatch.NO_TIMESTAMP
+ val expectedInnerOffset = new LongRef(0)
+ val validatedRecords = new mutable.ArrayBuffer[Record]
+
+ var uncompressedSizeInBytes = 0
+
+ for (batch <- records.batches.asScala) {
+ validateBatch(batch, isFromClient, toMagic)
+ uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
+
+ // Do not compress control records unless they are written compressed
+ if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+ inPlaceAssignment = true
+
+ for (record <- batch.asScala) {
+ if (sourceCodec != NoCompressionCodec && record.isCompressed)
+ throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+ s"compression attribute set: $record")
+ if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
+ throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
+ validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+
+ uncompressedSizeInBytes += record.sizeInBytes()
+ if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
+ // Check if we need to overwrite offset
+ // No in place assignment situation 3
+ if (record.offset != expectedInnerOffset.getAndIncrement())
+ inPlaceAssignment = false
+ if (record.timestamp > maxTimestamp)
+ maxTimestamp = record.timestamp
+ }
+
+ // No in place assignment situation 4
+ if (!record.hasMagic(toMagic))
+ inPlaceAssignment = false
+
+ validatedRecords += record
+ }
}
- // No in place assignment situation 4
- if (!record.hasMagic(toMagic))
- inPlaceAssignment = false
-
- validatedRecords += record
- }
-
- if (!inPlaceAssignment) {
- val (producerId, producerEpoch, sequence, isTransactional) = {
- // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
- // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
- // with older magic versions, there will never be a producer id, etc.
- val first = records.batches.asScala.head
- (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
- }
- buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
- validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
- uncompressedSizeInBytes)
- } else {
- // we can update the batch only and write the compressed payload as is;
- // again we assume only one record batch within the compressed set
- val batch = records.batches.iterator.next()
- val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
+ if (!inPlaceAssignment) {
+ val (producerId, producerEpoch, sequence, isTransactional) = {
+ // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
+ // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
+ // with older magic versions, there will never be a producer id, etc.
+ val first = records.batches.asScala.head
+ (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
+ }
+ buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+ validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+ uncompressedSizeInBytes)
+ } else {
+ // we can update the batch only and write the compressed payload as is
+ val batch = records.batches.iterator.next()
+ val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
- batch.setLastOffset(lastOffset)
+ batch.setLastOffset(lastOffset)
- if (timestampType == TimestampType.LOG_APPEND_TIME)
- maxTimestamp = now
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ maxTimestamp = now
- if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
- batch.setMaxTimestamp(timestampType, maxTimestamp)
+ if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
+ batch.setMaxTimestamp(timestampType, maxTimestamp)
- if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
- batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+ if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
+ batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
- val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
- ValidationAndOffsetAssignResult(validatedRecords = records,
- maxTimestamp = maxTimestamp,
- shallowOffsetOfMaxTimestamp = lastOffset,
- messageSizeMaybeChanged = false,
- recordConversionStats = recordConversionStats)
- }
+ val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
+ ValidationAndOffsetAssignResult(validatedRecords = records,
+ maxTimestamp = maxTimestamp,
+ shallowOffsetOfMaxTimestamp = lastOffset,
+ messageSizeMaybeChanged = false,
+ recordConversionStats = recordConversionStats)
+ }
}
private def buildRecordsAndAssignOffsets(magic: Byte,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index d306b16..37553b9 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -19,7 +19,7 @@ package kafka.log
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
-import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
+import kafka.api.{ApiVersion, KAFKA_2_0_IV1}
import kafka.common.LongRef
import kafka.message._
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.test.TestUtils
import org.junit.Assert._
import org.junit.Test
-import org.scalatest.Assertions.{assertThrows, intercept}
+import org.scalatest.Assertions.intercept
import scala.collection.JavaConverters._
@@ -37,51 +37,6 @@ class LogValidatorTest {
val time = Time.SYSTEM
@Test
- def testOnlyOneBatchCompressedV0(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
- }
-
- @Test
- def testOnlyOneBatchCompressedV1(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
- }
-
- @Test
- def testOnlyOneBatchCompressedV2(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP)
- }
-
- @Test
- def testOnlyOneBatchUncompressedV2(): Unit = {
- checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE)
- }
-
- private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) {
- validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType)
-
- assertThrows[InvalidRecordException] {
- validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType)
- }
- }
-
- private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = {
- LogValidator.validateMessagesAndAssignOffsets(records,
- new LongRef(0L),
- time,
- now = 0L,
- CompressionCodec.getCompressionCodec(compressionType.name),
- CompressionCodec.getCompressionCodec(compressionType.name),
- compactedTopic = false,
- magic,
- TimestampType.CREATE_TIME,
- 1000L,
- RecordBatch.NO_PRODUCER_EPOCH,
- isFromClient = true,
- KAFKA_2_3_IV1
- )
- }
-
- @Test
def testLogAppendTimeNonCompressedV1() {
checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
}
@@ -1182,22 +1137,6 @@ class LogValidatorTest {
builder.build()
}
- def createTwoBatchedRecords(magicValue: Byte,
- timestamp: Long = RecordBatch.NO_TIMESTAMP,
- codec: CompressionType): MemoryRecords = {
- val buf = ByteBuffer.allocate(2048)
- var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
- builder.append(10L, "1".getBytes(), "a".getBytes())
- builder.close()
- builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L)
- builder.append(11L, "2".getBytes(), "b".getBytes())
- builder.append(12L, "3".getBytes(), "c".getBytes())
- builder.close()
-
- buf.flip()
- MemoryRecords.readableRecords(buf.slice())
- }
-
/* check that offsets are assigned consecutively from the given base offset */
def checkOffsets(records: MemoryRecords, baseOffset: Long) {
assertTrue("Message set should not be empty", records.records.asScala.nonEmpty)