You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jason Weiss <Ja...@rapid7.com> on 2013/05/17 16:08:48 UTC

InvalidMessageException problems

I have a simple multi-threaded app trying to send numerous fixed-length, 2048 byte or 3072 byte messages into an Apache Kafka 0.7.2 cluster (3 machines) running in AWS on some AWS AMIs. When the messaging volume increases rapidly, a spike, I start running into lots of problems, specifically InvalidMessageException errors.

I'm using a default Kafka server config with the exception of bumping up the network threads from 3 to 4. Zookeeper is in the mix as well. I'm not using any compression (none). Here is my producer config:

            props.put("zk.connect", zkConnect);
            props.put("batch.num.messages", 500);
            props.put("queue.buffering.max.messages", 30000);
            props.put("serializer.class", "kafka.serializer.StringEncoder");


I'm at a loss on what knobs I need to turn to fix this. Can anyone on the list offer any insight into this?

[2013-05-17 13:55:16,715] ERROR Error processing MultiProducerRequest on ETL:0 (kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 3078 curr offset: 0 init offset: 0
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
at kafka.log.Log.append(Log.scala:218)
at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:679)

This electronic message contains information which may be confidential or privileged. The information is intended for the use of the individual or entity named above. If you are not the intended recipient, be aware that any disclosure, copying, distribution or use of the contents of this information is prohibited. If you have received this electronic transmission in error, please notify us by e-mail at (postmaster@rapid7.com) immediately.

Re: InvalidMessageException problems

Posted by Jason Weiss <Ja...@rapid7.com>.
I am using the Java producer, and it is reproducible. When you say that
the producer sends the corrupted data, are you referring to the producer
as a black box, or something in my code?

Where I'm getting stuck is that my producer data seems so ridiculously
simple - create a ProducerData, following the example producer from the
distro using a Integer key type with a message of type string, then send
it off. 

As for producer configuration, I only have zk.connect and serializer.class
(StringEncoder).


char[] logLine = logSimulator.fetchLogLine();
                StringBuffer buffer = new StringBuffer(blockSize);
                buffer.append(logLine);

                ProducerData<Integer, String> data = new
ProducerData<Integer, String>("ETL", buffer.toString());
                producer.send(data);



Jason



On 5/17/13 11:17 AM, "Jun Rao" <ju...@gmail.com> wrote:

>This indicates the messages sent to the broker are corrupted. Typically,
>this is because either the producer sends the corrupted data somehow or
>the
>network is flaky. Are you using a java producer? Is this reproducible?
>
>Thanks,
>
>Jun
>
>
>On Fri, May 17, 2013 at 7:08 AM, Jason Weiss <Ja...@rapid7.com>
>wrote:
>
>> I have a simple multi-threaded app trying to send numerous fixed-length,
>> 2048 byte or 3072 byte messages into an Apache Kafka 0.7.2 cluster (3
>> machines) running in AWS on some AWS AMIs. When the messaging volume
>> increases rapidly, a spike, I start running into lots of problems,
>> specifically InvalidMessageException errors.
>>
>> I'm using a default Kafka server config with the exception of bumping up
>> the network threads from 3 to 4. Zookeeper is in the mix as well. I'm
>>not
>> using any compression (none). Here is my producer config:
>>
>>             props.put("zk.connect", zkConnect);
>>             props.put("batch.num.messages", 500);
>>             props.put("queue.buffering.max.messages", 30000);
>>             props.put("serializer.class",
>> "kafka.serializer.StringEncoder");
>>
>>
>> I'm at a loss on what knobs I need to turn to fix this. Can anyone on
>>the
>> list offer any insight into this?
>>
>> [2013-05-17 13:55:16,715] ERROR Error processing MultiProducerRequest on
>> ETL:0 (kafka.server.KafkaRequestHandlers)
>> kafka.message.InvalidMessageException: message is invalid, compression
>> codec: NoCompressionCodec size: 3078 curr offset: 0 init offset: 0
>> at
>> 
>>kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessag
>>eSet.scala:130)
>> at
>> 
>>kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.
>>scala:160)
>> at
>> 
>>kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.
>>scala:100)
>> at 
>>kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> at
>> 
>>kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet
>>.scala:89)
>> at kafka.log.Log.append(Log.scala:218)
>> at
>> 
>>kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$hand
>>leProducerRequest(KafkaRequestHandlers.scala:69)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.a
>>pply(KafkaRequestHandlers.scala:62)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.a
>>pply(KafkaRequestHandlers.scala:62)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:206)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:206)
>> at
>> 
>>scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.sc
>>ala:34)
>> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> at
>> 
>>kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequest
>>Handlers.scala:62)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaReques
>>tHandlers.scala:41)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaReques
>>tHandlers.scala:41)
>> at kafka.network.Processor.handle(SocketServer.scala:296)
>> at kafka.network.Processor.read(SocketServer.scala:319)
>> at kafka.network.Processor.run(SocketServer.scala:214)
>> at java.lang.Thread.run(Thread.java:679)
>>
>> This electronic message contains information which may be confidential
>>or
>> privileged. The information is intended for the use of the individual or
>> entity named above. If you are not the intended recipient, be aware that
>> any disclosure, copying, distribution or use of the contents of this
>> information is prohibited. If you have received this electronic
>> transmission in error, please notify us by e-mail at (
>> postmaster@rapid7.com) immediately.
>>

This electronic message contains information which may be confidential or privileged. The information is intended for the use of the individual or entity named above. If you are not the intended recipient, be aware that any disclosure, copying, distribution or use of the contents of this information is prohibited. If you have received this electronic transmission in error, please notify us by e-mail at (postmaster@rapid7.com) immediately.


Re: InvalidMessageException problems

Posted by Jason Weiss <Ja...@rapid7.com>.
Turns out it was OpenJDK on the AWS AMI instance. As soon as I replaced
OpenJDK:

java version "1.6.0_24"
OpenJDK Runtime Environment (IcedTea6 1.11.11)
(amazon-61.1.11.11.53.amzn1-x86_64)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)


With Oracle JDK 1.6.0_38, all of my problems went away.

Thanks again for the prompt response.


Jason


On 5/17/13 11:17 AM, "Jun Rao" <ju...@gmail.com> wrote:

>This indicates the messages sent to the broker are corrupted. Typically,
>this is because either the producer sends the corrupted data somehow or
>the
>network is flaky. Are you using a java producer? Is this reproducible?
>
>Thanks,
>
>Jun
>
>
>On Fri, May 17, 2013 at 7:08 AM, Jason Weiss <Ja...@rapid7.com>
>wrote:
>
>> I have a simple multi-threaded app trying to send numerous fixed-length,
>> 2048 byte or 3072 byte messages into an Apache Kafka 0.7.2 cluster (3
>> machines) running in AWS on some AWS AMIs. When the messaging volume
>> increases rapidly, a spike, I start running into lots of problems,
>> specifically InvalidMessageException errors.
>>
>> I'm using a default Kafka server config with the exception of bumping up
>> the network threads from 3 to 4. Zookeeper is in the mix as well. I'm
>>not
>> using any compression (none). Here is my producer config:
>>
>>             props.put("zk.connect", zkConnect);
>>             props.put("batch.num.messages", 500);
>>             props.put("queue.buffering.max.messages", 30000);
>>             props.put("serializer.class",
>> "kafka.serializer.StringEncoder");
>>
>>
>> I'm at a loss on what knobs I need to turn to fix this. Can anyone on
>>the
>> list offer any insight into this?
>>
>> [2013-05-17 13:55:16,715] ERROR Error processing MultiProducerRequest on
>> ETL:0 (kafka.server.KafkaRequestHandlers)
>> kafka.message.InvalidMessageException: message is invalid, compression
>> codec: NoCompressionCodec size: 3078 curr offset: 0 init offset: 0
>> at
>> 
>>kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessag
>>eSet.scala:130)
>> at
>> 
>>kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.
>>scala:160)
>> at
>> 
>>kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.
>>scala:100)
>> at 
>>kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> at
>> 
>>kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet
>>.scala:89)
>> at kafka.log.Log.append(Log.scala:218)
>> at
>> 
>>kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$hand
>>leProducerRequest(KafkaRequestHandlers.scala:69)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.a
>>pply(KafkaRequestHandlers.scala:62)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.a
>>pply(KafkaRequestHandlers.scala:62)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:206)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:206)
>> at
>> 
>>scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.sc
>>ala:34)
>> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> at
>> 
>>kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequest
>>Handlers.scala:62)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaReques
>>tHandlers.scala:41)
>> at
>> 
>>kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaReques
>>tHandlers.scala:41)
>> at kafka.network.Processor.handle(SocketServer.scala:296)
>> at kafka.network.Processor.read(SocketServer.scala:319)
>> at kafka.network.Processor.run(SocketServer.scala:214)
>> at java.lang.Thread.run(Thread.java:679)
>>
>> This electronic message contains information which may be confidential
>>or
>> privileged. The information is intended for the use of the individual or
>> entity named above. If you are not the intended recipient, be aware that
>> any disclosure, copying, distribution or use of the contents of this
>> information is prohibited. If you have received this electronic
>> transmission in error, please notify us by e-mail at (
>> postmaster@rapid7.com) immediately.

This electronic message contains information which may be confidential or privileged. The information is intended for the use of the individual or entity named above. If you are not the intended recipient, be aware that any disclosure, copying, distribution or use of the contents of this information is prohibited. If you have received this electronic transmission in error, please notify us by e-mail at (postmaster@rapid7.com) immediately.


Re: InvalidMessageException problems

Posted by Jun Rao <ju...@gmail.com>.
This indicates the messages sent to the broker are corrupted. Typically,
this is because either the producer sends the corrupted data somehow or the
network is flaky. Are you using a java producer? Is this reproducible?

Thanks,

Jun


On Fri, May 17, 2013 at 7:08 AM, Jason Weiss <Ja...@rapid7.com> wrote:

> I have a simple multi-threaded app trying to send numerous fixed-length,
> 2048 byte or 3072 byte messages into an Apache Kafka 0.7.2 cluster (3
> machines) running in AWS on some AWS AMIs. When the messaging volume
> increases rapidly, a spike, I start running into lots of problems,
> specifically InvalidMessageException errors.
>
> I'm using a default Kafka server config with the exception of bumping up
> the network threads from 3 to 4. Zookeeper is in the mix as well. I'm not
> using any compression (none). Here is my producer config:
>
>             props.put("zk.connect", zkConnect);
>             props.put("batch.num.messages", 500);
>             props.put("queue.buffering.max.messages", 30000);
>             props.put("serializer.class",
> "kafka.serializer.StringEncoder");
>
>
> I'm at a loss on what knobs I need to turn to fix this. Can anyone on the
> list offer any insight into this?
>
> [2013-05-17 13:55:16,715] ERROR Error processing MultiProducerRequest on
> ETL:0 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 3078 curr offset: 0 init offset: 0
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
> at kafka.log.Log.append(Log.scala:218)
> at
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> at
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at kafka.network.Processor.handle(SocketServer.scala:296)
> at kafka.network.Processor.read(SocketServer.scala:319)
> at kafka.network.Processor.run(SocketServer.scala:214)
> at java.lang.Thread.run(Thread.java:679)
>
> This electronic message contains information which may be confidential or
> privileged. The information is intended for the use of the individual or
> entity named above. If you are not the intended recipient, be aware that
> any disclosure, copying, distribution or use of the contents of this
> information is prohibited. If you have received this electronic
> transmission in error, please notify us by e-mail at (
> postmaster@rapid7.com) immediately.
>