You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vidhya Arvind <vi...@gmail.com> on 2015/09/12 21:32:22 UTC
Broker restart, new producer and snappy in 0.8.2.1
There has been multiple instances of this incident. When I restart the
broker for config changes it's bringing down the consumers and mirror maker
and I am seeing CRC corruption in mirror maker and following error in
broker. I have reset the offset in zookeeper for certain topic/partitions.
But I still see this issue popping up for other topics/partitions. Please
let me know how I can resolve this. This has happened twice in production
system
Please let me if there is anything I can try to fix the issue before the
next broker restart
Vidhya
[2015-09-12 19:04:03,409] ERROR [KafkaApi-30001] Error processing
ProducerRequest with correlation id 3480 from client producer-1 on
partition [events_prod_oncue.ws.client.ui_UIEvent,29]
(kafka.server.KafkaApis)
kafka.common.KafkaException: Error in validating messages while
appending to log 'events_prod_oncue.ws.client.ui_UIEvent-29'
at kafka.log.Log.liftedTree1$1(Log.scala:277)
at kafka.log.Log.append(Log.scala:274)
at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.utils.Utils$.inReadLock(Utils.scala:541)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
at java.io.InputStream.read(InputStream.java:101)
at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
at scala.collection.immutable.Stream.foreach(Stream.scala:548)
at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at kafka.message.ByteBufferMessageSet.assignOffsets(ByteBufferMessageSet.scala:220)
at kafka.log.Log.liftedTree1$1(Log.scala:275)
... 22 more
Re: Broker restart, new producer and snappy in 0.8.2.1
Posted by Vidhya Arvind <vi...@gmail.com>.
Ok Thanks Joe, Will try 0.8.2.2 producer
Vidhya
On Sun, Sep 13, 2015 at 7:12 AM, Joe Stein <jo...@stealth.ly> wrote:
> Hi, the 0.8.2.2 release (which vote just passed and should be announced
> soon) has a patch that may be related
> https://issues.apache.org/jira/browse/KAFKA-2308 not sure.
>
> Here are the 0.8.2.2 artifacts
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/ I don't see
> them yet in Maven central so you will need to download into a local mvn and
> use 0.8.2.2 producer. I don't know it if will fix your problem but if you
> can reproduce it and see and if not post to a JIRA that would be great.
>
> Thanks!
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - - - -
> [image: Logo-Black.jpg]
> http://www.elodina.net
> http://www.stealth.ly
> - - - - - - - - - - - - - - - - - - -
>
> On Sat, Sep 12, 2015 at 3:32 PM, Vidhya Arvind <vi...@gmail.com>
> wrote:
>
> > There has been multiple instances of this incident. When I restart the
> > broker for config changes it's bringing down the consumers and mirror
> maker
> > and I am seeing CRC corruption in mirror maker and following error in
> > broker. I have reset the offset in zookeeper for certain
> topic/partitions.
> > But I still see this issue popping up for other topics/partitions. Please
> > let me know how I can resolve this. This has happened twice in production
> > system
> >
> > Please let me if there is anything I can try to fix the issue before the
> > next broker restart
> >
> > Vidhya
> >
> >
> > [2015-09-12 19:04:03,409] ERROR [KafkaApi-30001] Error processing
> > ProducerRequest with correlation id 3480 from client producer-1 on
> > partition [events_prod_oncue.ws.client.ui_UIEvent,29]
> > (kafka.server.KafkaApis)
> > kafka.common.KafkaException: Error in validating messages while
> > appending to log 'events_prod_oncue.ws.client.ui_UIEvent-29'
> > at kafka.log.Log.liftedTree1$1(Log.scala:277)
> > at kafka.log.Log.append(Log.scala:274)
> > at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> > at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
> > at kafka.utils.Utils$.inLock(Utils.scala:535)
> > at kafka.utils.Utils$.inReadLock(Utils.scala:541)
> > at
> > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> > at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > at
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
> > at
> >
> kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
> > at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> > at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> > at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> > at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
> > at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
> > at
> >
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
> > at
> > org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
> > at
> > org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
> > at java.io.InputStream.read(InputStream.java:101)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> > at
> > scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > at
> >
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > at
> >
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > at
> >
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > at
> >
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > at
> >
> kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
> > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> > at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> > at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> > at scala.collection.TraversableOnce$class.to
> > (TraversableOnce.scala:273)
> > at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> > at
> >
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> > at
> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> > at
> >
> kafka.message.ByteBufferMessageSet.assignOffsets(ByteBufferMessageSet.scala:220)
> > at kafka.log.Log.liftedTree1$1(Log.scala:275)
> > ... 22 more
> >
>
Re: Broker restart, new producer and snappy in 0.8.2.1
Posted by Joe Stein <jo...@stealth.ly>.
Hi, the 0.8.2.2 release (which vote just passed and should be announced
soon) has a patch that may be related
https://issues.apache.org/jira/browse/KAFKA-2308 not sure.
Here are the 0.8.2.2 artifacts
https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/ I don't see
them yet in Maven central so you will need to download into a local mvn and
use 0.8.2.2 producer. I don't know it if will fix your problem but if you
can reproduce it and see and if not post to a JIRA that would be great.
Thanks!
~ Joe Stein
- - - - - - - - - - - - - - - - - - -
[image: Logo-Black.jpg]
http://www.elodina.net
http://www.stealth.ly
- - - - - - - - - - - - - - - - - - -
On Sat, Sep 12, 2015 at 3:32 PM, Vidhya Arvind <vi...@gmail.com>
wrote:
> There has been multiple instances of this incident. When I restart the
> broker for config changes it's bringing down the consumers and mirror maker
> and I am seeing CRC corruption in mirror maker and following error in
> broker. I have reset the offset in zookeeper for certain topic/partitions.
> But I still see this issue popping up for other topics/partitions. Please
> let me know how I can resolve this. This has happened twice in production
> system
>
> Please let me if there is anything I can try to fix the issue before the
> next broker restart
>
> Vidhya
>
>
> [2015-09-12 19:04:03,409] ERROR [KafkaApi-30001] Error processing
> ProducerRequest with correlation id 3480 from client producer-1 on
> partition [events_prod_oncue.ws.client.ui_UIEvent,29]
> (kafka.server.KafkaApis)
> kafka.common.KafkaException: Error in validating messages while
> appending to log 'events_prod_oncue.ws.client.ui_UIEvent-29'
> at kafka.log.Log.liftedTree1$1(Log.scala:277)
> at kafka.log.Log.append(Log.scala:274)
> at
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> at
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
> at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
> at
> kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
> at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
> at
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
> at
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
> at java.io.InputStream.read(InputStream.java:101)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> at
> scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> at
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> at
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> at
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> at
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> at
> kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> kafka.message.ByteBufferMessageSet.assignOffsets(ByteBufferMessageSet.scala:220)
> at kafka.log.Log.liftedTree1$1(Log.scala:275)
> ... 22 more
>