You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vidhya Arvind <vi...@gmail.com> on 2015/09/12 21:32:22 UTC

Broker restart, new producer and snappy in 0.8.2.1

There has been multiple instances of this incident. When I restart the
broker for config changes it's bringing down the consumers and mirror maker
and I am seeing CRC corruption in mirror maker and following error in
broker. I have reset the offset in zookeeper for certain topic/partitions.
But I still see this issue popping up for other topics/partitions. Please
let me know how I can resolve this. This has happened twice in production
system

Please let me if there is anything I can try to fix the issue before the
next broker restart

Vidhya


[2015-09-12 19:04:03,409] ERROR [KafkaApi-30001] Error processing
ProducerRequest with correlation id 3480 from client producer-1 on
partition [events_prod_oncue.ws.client.ui_UIEvent,29]
(kafka.server.KafkaApis)
kafka.common.KafkaException: Error in validating messages while
appending to log 'events_prod_oncue.ws.client.ui_UIEvent-29'
	at kafka.log.Log.liftedTree1$1(Log.scala:277)
	at kafka.log.Log.append(Log.scala:274)
	at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
	at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
	at kafka.utils.Utils$.inLock(Utils.scala:535)
	at kafka.utils.Utils$.inReadLock(Utils.scala:541)
	at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
	at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
	at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
	at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
	at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
	at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
	at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
	at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
	at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
	at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
	at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
	at java.io.InputStream.read(InputStream.java:101)
	at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
	at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
	at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
	at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
	at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
	at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
	at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
	at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
	at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
	at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
	at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
	at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
	at scala.collection.immutable.Stream.foreach(Stream.scala:548)
	at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at kafka.message.ByteBufferMessageSet.assignOffsets(ByteBufferMessageSet.scala:220)
	at kafka.log.Log.liftedTree1$1(Log.scala:275)
	... 22 more

Re: Broker restart, new producer and snappy in 0.8.2.1

Posted by Vidhya Arvind <vi...@gmail.com>.
Ok Thanks Joe, Will try 0.8.2.2 producer

Vidhya

On Sun, Sep 13, 2015 at 7:12 AM, Joe Stein <jo...@stealth.ly> wrote:

> Hi, the 0.8.2.2 release (which vote just passed and should be announced
> soon) has a patch that may be related
> https://issues.apache.org/jira/browse/KAFKA-2308 not sure.
>
> Here are the 0.8.2.2 artifacts
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/ I don't see
> them yet in Maven central so you will need to download into a local mvn and
> use 0.8.2.2 producer. I don't know it if will fix your problem but if you
> can reproduce it and see and if not post to a JIRA that would be great.
>
> Thanks!
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - - - -
>      [image: Logo-Black.jpg]
>   http://www.elodina.net
>     http://www.stealth.ly
> - - - - - - - - - - - - - - - - - - -
>
> On Sat, Sep 12, 2015 at 3:32 PM, Vidhya Arvind <vi...@gmail.com>
> wrote:
>
> > There has been multiple instances of this incident. When I restart the
> > broker for config changes it's bringing down the consumers and mirror
> maker
> > and I am seeing CRC corruption in mirror maker and following error in
> > broker. I have reset the offset in zookeeper for certain
> topic/partitions.
> > But I still see this issue popping up for other topics/partitions. Please
> > let me know how I can resolve this. This has happened twice in production
> > system
> >
> > Please let me if there is anything I can try to fix the issue before the
> > next broker restart
> >
> > Vidhya
> >
> >
> > [2015-09-12 19:04:03,409] ERROR [KafkaApi-30001] Error processing
> > ProducerRequest with correlation id 3480 from client producer-1 on
> > partition [events_prod_oncue.ws.client.ui_UIEvent,29]
> > (kafka.server.KafkaApis)
> > kafka.common.KafkaException: Error in validating messages while
> > appending to log 'events_prod_oncue.ws.client.ui_UIEvent-29'
> >         at kafka.log.Log.liftedTree1$1(Log.scala:277)
> >         at kafka.log.Log.append(Log.scala:274)
> >         at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> >         at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> >         at kafka.utils.Utils$.inReadLock(Utils.scala:541)
> >         at
> > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >         at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> >         at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> >         at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >         at
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
> >         at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
> >         at
> >
> kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
> >         at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
> >         at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> >         at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> >         at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> >         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> >         at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
> >         at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
> >         at
> >
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
> >         at
> > org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
> >         at
> > org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
> >         at java.io.InputStream.read(InputStream.java:101)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> >         at
> > scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> >         at
> >
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> >         at
> >
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> >         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> >         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> >         at
> >
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> >         at
> >
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> >         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> >         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> >         at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> >         at
> >
> kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> >         at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> >         at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> >         at scala.collection.TraversableOnce$class.to
> > (TraversableOnce.scala:273)
> >         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> >         at
> >
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> >         at
> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> >         at
> >
> kafka.message.ByteBufferMessageSet.assignOffsets(ByteBufferMessageSet.scala:220)
> >         at kafka.log.Log.liftedTree1$1(Log.scala:275)
> >         ... 22 more
> >
>

Re: Broker restart, new producer and snappy in 0.8.2.1

Posted by Joe Stein <jo...@stealth.ly>.
Hi, the 0.8.2.2 release (which vote just passed and should be announced
soon) has a patch that may be related
https://issues.apache.org/jira/browse/KAFKA-2308 not sure.

Here are the 0.8.2.2 artifacts
https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/ I don't see
them yet in Maven central so you will need to download into a local mvn and
use 0.8.2.2 producer. I don't know it if will fix your problem but if you
can reproduce it and see and if not post to a JIRA that would be great.

Thanks!

~ Joe Stein
- - - - - - - - - - - - - - - - - - -
     [image: Logo-Black.jpg]
  http://www.elodina.net
    http://www.stealth.ly
- - - - - - - - - - - - - - - - - - -

On Sat, Sep 12, 2015 at 3:32 PM, Vidhya Arvind <vi...@gmail.com>
wrote:

> There has been multiple instances of this incident. When I restart the
> broker for config changes it's bringing down the consumers and mirror maker
> and I am seeing CRC corruption in mirror maker and following error in
> broker. I have reset the offset in zookeeper for certain topic/partitions.
> But I still see this issue popping up for other topics/partitions. Please
> let me know how I can resolve this. This has happened twice in production
> system
>
> Please let me if there is anything I can try to fix the issue before the
> next broker restart
>
> Vidhya
>
>
> [2015-09-12 19:04:03,409] ERROR [KafkaApi-30001] Error processing
> ProducerRequest with correlation id 3480 from client producer-1 on
> partition [events_prod_oncue.ws.client.ui_UIEvent,29]
> (kafka.server.KafkaApis)
> kafka.common.KafkaException: Error in validating messages while
> appending to log 'events_prod_oncue.ws.client.ui_UIEvent-29'
>         at kafka.log.Log.liftedTree1$1(Log.scala:277)
>         at kafka.log.Log.append(Log.scala:274)
>         at
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
>         at
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
>         at kafka.utils.Utils$.inLock(Utils.scala:535)
>         at kafka.utils.Utils$.inReadLock(Utils.scala:541)
>         at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
>         at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
>         at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
>         at
> kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>         at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
>         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>         at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
>         at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
>         at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
>         at
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
>         at
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
>         at java.io.InputStream.read(InputStream.java:101)
>         at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
>         at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
>         at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
>         at
> scala.collection.immutable.Stream$.continually(Stream.scala:1129)
>         at
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
>         at
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>         at
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
>         at
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:548)
>         at
> kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at
> kafka.message.ByteBufferMessageSet.assignOffsets(ByteBufferMessageSet.scala:220)
>         at kafka.log.Log.liftedTree1$1(Log.scala:275)
>         ... 22 more
>