You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/08/01 17:35:48 UTC

[jira] [Commented] (KAFKA-994) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size

    [ https://issues.apache.org/jira/browse/KAFKA-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13726525#comment-13726525 ] 

Jun Rao commented on KAFKA-994:
-------------------------------

Thanks for filing this jira. Looks like a real issue. Just changing validBytes to sizeInBytes may not be enough. In enqueue(), currently we expect the chunk to have at least one message in order to get the next fetch offset. Of course, if we hit a large message, that expectation won't be true. So, we have to change the logic a bit such that if there is not a single message in the chunk, we don't move the fetch offset, but still insert the chunk to the queue (so that the consumer thread can see it).

Thanks,

Jun
                
> High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-994
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>             Fix For: 0.8
>
>         Attachments: messageSize.patch
>
>
> The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is:
>       // if we just updated the current chunk and it is empty that means the fetch size is too small!
>       if(currentDataChunk.messages.validBytes == 0)
>         throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
>                                                "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
>                                                .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
>     }
> The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue:
>    def enqueue(messages: ByteBufferMessageSet) {
> -    val size = messages.sizeInBytes
> +    val size = messages.validBytes
>      if(size > 0) {
> i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... 
> I've attached a patch that passes our tests...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira