You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2013/10/02 23:31:42 UTC

[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

     [ https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang updated KAFKA-527:
--------------------------------

    Attachment: KAFKA-527.message-copy.history

Attached a file summarizing the copying operations. If we count the shallow iterating over message sets then there are about 8 copy operations throughout the life time of a message. Among them Copy 3/4/5 could be avoided, and we could only apply one shallow iteration on each broker (leader and follower) and consumer.

Given that the offset assignment is currently the main reason for converting between array of Message and ByteBufferMessageSet instead of appending Message to ByteBufferMessageSet, and that trimming invalid bytes on consumer complicates its logic, one proposal to refactoring the compression while still be backward compatible is:

1) On serving fetch request, only returns complete Messages within the request rate so the consumer would not need to trim invalid bytes.
2) Use the first 6 bits of the attribute byte of Message to indicate the number of real messages it wraps. This will limit the number of compressed messages in one wrapper to be 256, which I think is good enough.
3) When doing dedup, only delete the payload of the message but keep its counter so that the offsets are still consecutively incremental. 
4) For offset assignment on the broker, the ByteBufferMessageSet will have multiple wrapper Messages (since we have limit compress size to 256 one ProducerRequest now may have ByteBufferMessageSet of multiple wrapper messages); we need to use the shallowIterator to iterate them and reassign offsets without decompress/recompress.
5) On the consumer side, allow shallow iteration as well as deep iteration; with deep iteration, the consumer iterator needs to assign offset to each decompressed message based on its wrapper messages offset and size. Also add the compression codec to the MessageAndMetadata returned to the consumers, which will then solve KAFKA-1011.

With this we can completely remove the de/re-compression on brokers, and always use the compressed wrapper message unit throughout the pipeline. However cons of this approach is that it complicates logic on producer (256 compression batch limit) and consumer (re-compute message offsets).

> Compression support does numerous byte copies
> ---------------------------------------------
>
>                 Key: KAFKA-527
>                 URL: https://issues.apache.org/jira/browse/KAFKA-527
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jay Kreps
>         Attachments: java.hprof.no-compression.txt, java.hprof.snappy.text, KAFKA-527.message-copy.history
>
>
> The data path for compressing or decompressing messages is extremely inefficient. We do something like 7 (?) complete copies of the data, often for simple things like adding a 4 byte size to the front. I am not sure how this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the Message/MessageSet interfaces which are based on byte buffers is the cause of many of these.



--
This message was sent by Atlassian JIRA
(v6.1#6144)