You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Sergio Daniel Troiano <se...@adevinta.com.INVALID> on 2022/10/15 10:11:05 UTC
[Doubt] about breaking zero copy
My doubt is regarding the produced batches conversions. (Breaking the zero
copy)
I am producing using Kafka streams 3.0 (same version as the Kafka cluster)
Messages are compressed on the producer.
I am seeing several ProduceConversion per sec rate in a topic, as we know
the recompression of the batches breaks the “zero copy” which means more
resources to be used by the brokers.
Checking the source code I saw different ways of breaking “in place”
batches.
1 sourceCompression != DestinationCompression (I.e: producer using GZIP and
topic using LZ4)
2 Magic number < 2 or magic number mismatch
3 Offsets
I think this is the code snippet which triggers the method to break the
zero copy
** Code at the bottom of the email
What I suspect is for some reason the offsets are not contiguous on the
produced batches which leads me to the main doubt, what could be a scenario
when this could happen?
I tried to see this with the dump-logs sh tool but of course this is not
possible as Kafka already converted the batches.
Also I thought about transactions could be the reason of the conversions as
they use the IsControl batch but as I saw the IsControl batch will always
contain one record (the control), so I assume control batches will never
have other “client generated” records.
So I would appreciate if you tell me can example of the offsets not
contiguos in a batch, in parallel I will continue my investigation as I am
intrigued about this conversions.
After this I want to write a public document about performance based on
batch conversions.
Thanks in advance
Best regards.
Sergio Troiano
-----------------------------------------------------------------
recordsIterator.forEachRemaining { record =>
val expectedOffset = expectedInnerOffset.getAndIncrement()
val recordError = validateRecordCompression(batchIndex, record).orElse {
validateRecord(batch, topicPartition, record, batchIndex, now,
timestampType, timestampDiffMaxMs, compactedTopic,
brokerTopicStats).orElse {
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic >
RecordBatch.MAGIC_VALUE_V0) {
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
// Some older clients do not implement the V1 internal offsets
correctly.
// Historically the broker handled this by rewriting the batches rather
// than rejecting the request. We must continue this handling
here to avoid
// breaking these clients.
if (record.offset != expectedOffset)
inPlaceAssignment = false
}
None
}
}