You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Swapnil Ghike (JIRA)" <ji...@apache.org> on 2012/11/02 02:36:12 UTC

[jira] [Comment Edited] (KAFKA-546) Fix commit() in zk consumer for compressed messages

    [ https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489191#comment-13489191 ] 

Swapnil Ghike edited comment on KAFKA-546 at 11/2/12 1:34 AM:
--------------------------------------------------------------

1. ConsumerIterator skips messages that have already been fetched.

2. consumer.PartitionTopicInfo.enqueue()
- Made a change to pass the starting offset of a messageSet instead of the current fetchedOffset to FetchedDataChunk(). The current code expects the fetchedOffset to be the same as the starting offset of the incoming messageSet. But if a messageSet was partially consumed and fetched again, the fetchedOffset that goes into the FetchedDataChunk will be greater than the starting offset in the incoming messageSet. The fix takes care of this situation. The fix also does not disturb consumption under normal sequential fetches, because in this situation the starting offset of incoming messageSet will actually be the same as the fetchedOffset recorded in partitionTopicInfo.

3. Added a unit test to test de-deduplication of messages in ConsumerIterator.
                
      was (Author: swapnilghike):
    1. ConsumerIterator skips messages that have already been fetched.

2. consumer.PartitionTopicInfo.enqueue()
- Changed a statement to use the starting offset of a messageSet instead of the current fetchedOffset. The current code expects the incoming offset to be the same as starting offset of the messageSet. But if a messageSet was partially consumed and fetched again, the fetchedOffset that goes into the FetchedDataChunk will be greater than the starting offset  in the messageSet. The fix takes care of this situation. The fix also does not disturb consumption under normal sequential fetches, and the behaviour will be the same as the current behaviour under normal sequential consumption.

3. Added a unit test to test de-deduplication of messages in ConsumerIterator.
                  
> Fix commit() in zk consumer for compressed messages
> ---------------------------------------------------
>
>                 Key: KAFKA-546
>                 URL: https://issues.apache.org/jira/browse/KAFKA-546
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Swapnil Ghike
>         Attachments: kafka-546-v1.patch
>
>
> In 0.7.x and earlier versions offsets were assigned by the byte location in the file. Because it wasn't possible to directly decompress from the middle of a compressed block, messages inside a compressed message set effectively had no offset. As a result the offset given to the consumer was always the offset of the wrapper message set.
> In 0.8 after the logical offsets patch messages in a compressed set do have offsets. However the server still needs to fetch from the beginning of the compressed messageset (otherwise it can't be decompressed). As a result a commit() which occurs in the middle of a message set will still result in some duplicates.
> This can be fixed in the ConsumerIterator by discarding messages smaller than the fetch offset rather than giving them to the consumer. This will make commit work correctly in the presence of compressed messages (finally).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira