You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2013/08/23 04:31:53 UTC

[jira] [Comment Edited] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline

    [ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13747631#comment-13747631 ] 

Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 2:30 AM:
---------------------------------------------------------------

Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key hash value or with key null, we can write the key to the compressed wrapper message according to partition id (currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition.
                
      was (Author: guozhang):
    Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key or with key null, we can write the key to the compressed wrapper message according to their keys (currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition.
                  
> Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1011
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1011
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>             Fix For: 0.8.1
>
>
> The way MirrorMaker works today is that its consumers could use deep iterator to decompress messages received from the source brokers and its producers could re-compress the messages while sending them to the target brokers. Since MirrorMakers use a centralized data channel for its consumers to pipe messages to its producers, and since producers would compress messages with the same topic within a batch as a single produce request, this could result in messages accepted at the front end of the pipeline being dropped at the target brokers of the MirrorMaker due to MesageSizeTooLargeException if it happens that one batch of messages contain too many messages of the same topic in MirrorMaker's producer. If we can use shallow iterator at the MirrorMaker's consumer side to directly pipe compressed messages this issue can be fixed. 
> Also as Swapnil pointed out, currently if the MirrorMaker lags and there are large messages in the MirrorMaker queue (large after decompression), it can run into an OutOfMemoryException. Shallow iteration will be very helpful in avoiding this exception.
> The proposed solution of this issue is also related to KAFKA-527.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira