You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Sergio Daniel Troiano <se...@adevinta.com.INVALID> on 2022/10/15 10:11:05 UTC

[Doubt] about breaking zero copy

My doubt is regarding the produced batches conversions. (Breaking the zero
copy)

I am producing using Kafka streams 3.0 (same version as the Kafka cluster)

Messages are compressed on the producer.


I am seeing several ProduceConversion  per sec rate in a topic, as we know
the recompression of the batches breaks the “zero copy” which means more
resources to be used by the brokers.



Checking the source code I saw different ways of breaking “in place”
batches.


1 sourceCompression != DestinationCompression (I.e: producer using GZIP and
topic using LZ4)

2 Magic number < 2 or magic number mismatch

3 Offsets


I think this is the code snippet which triggers the method to break the
zero copy

** Code at the bottom of the email



What I suspect is for some reason the offsets are not contiguous on the
produced batches which leads me to the main doubt, what could be a scenario
when this could happen?


I tried to see this with the dump-logs sh tool but of course this is not
possible as Kafka already converted the batches.


Also I thought about transactions  could be the reason of the conversions  as
they use the IsControl batch but as I saw the IsControl batch will always
contain one record (the control), so I assume control batches will never
have other “client generated” records.



So I would appreciate if you tell me can example of the offsets not
contiguos  in a batch, in parallel I will continue my investigation as I am
intrigued about this conversions.



After this I want to write a public document about performance based on
batch conversions.


Thanks in advance


Best regards.


Sergio Troiano






-----------------------------------------------------------------

recordsIterator.forEachRemaining { record =>
  val expectedOffset = expectedInnerOffset.getAndIncrement()
  val recordError = validateRecordCompression(batchIndex, record).orElse {
    validateRecord(batch, topicPartition, record, batchIndex, now,
      timestampType, timestampDiffMaxMs, compactedTopic,
brokerTopicStats).orElse {
      if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic >
RecordBatch.MAGIC_VALUE_V0) {
        if (record.timestamp > maxTimestamp)
          maxTimestamp = record.timestamp

        // Some older clients do not implement the V1 internal offsets
correctly.
        // Historically the broker handled this by rewriting the batches rather
        // than rejecting the request. We must continue this handling
here to avoid
        // breaking these clients.
        if (record.offset != expectedOffset)
          inPlaceAssignment = false
      }
      None
    }
  }