You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rahul Biswas <rb...@rocketfuelinc.com> on 2012/08/01 01:43:18 UTC

Re: InvalidMessageSizeException

Hi Neha,

Thanks for the quick response. Am i looking for "isvalid : false"  in the output of the DumpLogSegments tool? 
If so, I don't see that (at least in about 180 messages on a single partition). Also, I've tried with an offset of -1 and ran into the same issue, so it can't be an invalid offset either right?

As a quick test, I modified the Storm Kafka Spout and commented out the logging that tries to write out the message size (line 122 below)
https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
And I don't run into the same problem with that change, is it possible we are running into a race condition here?

thanks,
Rahul

On Jul 31, 2012, at 2:28 PM, Neha Narkhede wrote:

> Rahul,
> 
> One of the first things you can check is whether the log on the server
> is corrupted or not. For this, you can use the DumpLogSegments tool
> and point it to the log segments for the topic partition that you see
> errors for.
> If that is fine, then the next thing to check would be the fetch
> offset that the Kafka spout is using, since if the fetch offset is
> incorrect, you will not be able to make any progress.
> 
> Thanks,
> Neha
> 
> On Tue, Jul 31, 2012 at 2:09 PM, Rahul Biswas <rb...@rocketfuelinc.com> wrote:
>> Hi,
>> 
>> We are running into this exception quite frequently now. We are using Storm's Kafka Spout as a consumer and as part of a logging message it tries to print out the size of the underlying message, when the validation on the size seems to fail.
>> 
>> The full stack trace is as below --
>> kafka.common.InvalidMessageSizeException: invalid message size: 161758511 only received bytes: 215831 at 10738556( possible causes (1) a single message larger than the fetch size; (2) log corruption )
>>        at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
>>        at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
>>        at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
>>        at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>        at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>        at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>        at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>        at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:100)
>>        at kafka.message.MessageSet.size(MessageSet.scala:87)
>>        at storm.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:111)
>>        at storm.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:87)
>>        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:208)
>>        at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:412)
>>        at backtype.storm.daemon.task$mk_task$iter__3281__3285$fn__3286$fn__3287$fn__3288.invoke(task.clj:262)
>>        at clojure.lang.AFn.applyToHelper(AFn.java:159)
>>        at clojure.lang.AFn.applyTo(AFn.java:151)
>>        at clojure.core$apply.invoke(core.clj:601)
>>        at backtype.storm.util$async_loop$fn__451.invoke(util.clj:369)
>>        at clojure.lang.AFn.run(AFn.java:24)
>>        at java.lang.Thread.run(Thread.java:619)
>> There are also negative message sizes (for a different consumer with a fetch size of 5MB) --
>> kafka.common.InvalidMessageSizeException: invalid message size: -2012627084 only received bytes: 5242876 at 1054940626667( possible causes (1) a single message larger than the fetch size; (2) log core
>> 
>> thanks.
>> Rahul