You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rahul Biswas <rb...@rocketfuelinc.com> on 2012/07/31 23:07:15 UTC

InvalidMessageSizeException

Hi,

We are running into this exception quite frequently now. We are using Storm's Kafka Spout as a consumer and as part of a logging message it tries to print out the size of the underlying message, when the validation on the size seems to fail.

The full stack trace is as below --
kafka.common.InvalidMessageSizeException: invalid message size: 161758511 only received bytes: 215831 at 10738556( possible causes (1) a single message larger than the fetch size; (2) log corruption )
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
	at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:100)
	at kafka.message.MessageSet.size(MessageSet.scala:87)
	at storm.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:111)
	at storm.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:87)
	at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:208)
	at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:412)
	at backtype.storm.daemon.task$mk_task$iter__3281__3285$fn__3286$fn__3287$fn__3288.invoke(task.clj:262)
	at clojure.lang.AFn.applyToHelper(AFn.java:159)
	at clojure.lang.AFn.applyTo(AFn.java:151)
	at clojure.core$apply.invoke(core.clj:601)
	at backtype.storm.util$async_loop$fn__451.invoke(util.clj:369)
	at clojure.lang.AFn.run(AFn.java:24)
	at java.lang.Thread.run(Thread.java:619)
There are also negative message sizes (for a different consumer with a fetch size of 5MB) --
kafka.common.InvalidMessageSizeException: invalid message size: -2012627084 only received bytes: 5242876 at 1054940626667( possible causes (1) a single message larger than the fetch size; (2) log core

thanks.
Rahul