You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Allen Wang <aw...@netflix.com.INVALID> on 2015/01/15 03:36:27 UTC

Leadership rebalance causing drop of incoming messages

Hello,

We did a manual leadership rebalance (using
PreferredReplicaLeaderElectionCommand) under heavy load and found that
there is a significant drop of incoming messages to the broker cluster for
more than an hour. Looking at broker log, we found a lot of errors like
this:

2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
[kafka-processor-7101-0] [error] Closing socket for /10.213.156.41
because of error
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at kafka.utils.Utils$.read(Utils.scala:375)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Processor.read(SocketServer.scala:347)
	at kafka.network.Processor.run(SocketServer.scala:245)
	at java.lang.Thread.run(Thread.java:745)


Is leadership rebalance a safe operation?

Thanks.

Re: Leadership rebalance causing drop of incoming messages

Posted by Allen Wang <aw...@netflix.com.INVALID>.
>
> It is unclear to me why restarting clients would fix the compatibility
> issue. Or do you mean you bounced in the right version of the snappy
> jar?
> Also, what version of the broker are you on?


We restarted the client with the compression turned off and that fixed the
problem.

We are using 0.8.1.1 for the broker and 0.8.2-beta for the producers.

What is the requests per sec that you see without failed requests?


It is about 16K messages per second per broker. The message rate drops to
5K messages per second when this error occurred. The error rate is only
single digit per broker per second. CPU utilization is only about 50%.

Sometimes there are also unclean leader elections happening at the same
time when this error occurs, which makes the problem worse. However, it
seems that it is an effect, rather than the cause of the problem because
there are also time that this error rate goes up without the unclean leader
elections.

Finally, is this easily reproducible?


Not quite. To our surprise, this error does not occur all the time, but
only comes in waves.

Thanks,
Allen

On Wed, Jan 21, 2015 at 1:58 PM, Joel Koshy <jj...@gmail.com> wrote:

> > instability of the broker cluster might have been caused by a snappy
> > un-compression error. In our case, the consumer and producer happens to
> be
> > the same application so restarting the client made the recovery of the
> > ...
> > The un-compression error is likely to be caused by incompatible snappy
> > version used by the producer.
>
> It is unclear to me why restarting clients would fix the compatibility
> issue. Or do you mean you bounced in the right version of the snappy
> jar?
>
> Also, what version of the broker are you on?
>
> > Also, it appears that when the metric FailedProduceRequestsPerSec (which
> > captures the above error) raises to a certain level (around 20/s), it
> will
> > start to have impact on the stability of the brokers, including
> significant
> > increase of GC time.
> >
> > How does Kafka handle such un-compression errors? Is it possible that the
> > ByteBuffer from the compressed messages are not released quick enough
> when
> > such errors happen at high rate?
>
> This is surprising. The only thing I can think of is if everything
> gets rejected there will be less IO by virtue of the fact that nothing
> gets flushed to disk, but in terms of GC I don't quite see why it
> would cause an increase with the errors alone since the broker
> decompressed the message-set no matter what in order to assign
> offsets. So the byte buffers need to be allocated anyway.
>
> What is the requests per sec that you see without failed requests?
>
> Finally, is this easily reproducible?
>
> Joel
>
>
> On Wed, Jan 21, 2015 at 11:03:40AM -0800, Allen Wang wrote:
> > After a closer look to other metrics and broker logs, we found that the
> > instability of the broker cluster might have been caused by a snappy
> > un-compression error. In our case, the consumer and producer happens to
> be
> > the same application so restarting the client made the recovery of the
> > broker cluster. Here is the error we saw in the broker log:
> >
> >   2015-01-19 00:07:24,891 ERROR kafka.utils.Logging$class:103
> > [kafka-request-handler-0] [error] [KafkaApi-7] Error processing
> > ProducerRequest with correlation id 393323 from client surorouter-mantis
> on
> > partition [logblob-networkerror,0]
> >
> > kafka.common.KafkaException: Error in validating messages while appending
> > to log 'logblob-networkerror-0'
> >
> > at kafka.log.Log.liftedTree1$1(Log.scala:254)
> >
> > at kafka.log.Log.append(Log.scala:251)
> >
> > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
> >
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
> >
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
> >
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> >
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> >
> > at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >
> > at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> >
> > at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> >
> > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> >
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> >
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >
> > at scala.collection.mutable.HashMap.map(HashMap.scala:45)
> >
> > at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
> >
> > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
> >
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
> >
> > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> >
> > at java.lang.Thread.run(Thread.java:745)
> >
> > Caused by: java.io.IOException: failed to uncompress the chunk:
> > FAILED_TO_UNCOMPRESS(5)
> >
> > at
> >
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
> >
> > at
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159)
> >
> > at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
> >
> > at java.io.InputStream.read(InputStream.java:101)
> >
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:68)
> >
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:68)
> >
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:68)
> >
> > at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >
> > at
> >
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
> >
> > at
> >
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
> >
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >
> > at
> >
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
> >
> > at
> >
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
> >
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >
> > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >
> > at scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >
> > The un-compression error is likely to be caused by incompatible snappy
> > version used by the producer.
> >
> > Also, it appears that when the metric FailedProduceRequestsPerSec (which
> > captures the above error) raises to a certain level (around 20/s), it
> will
> > start to have impact on the stability of the brokers, including
> significant
> > increase of GC time.
> >
> > How does Kafka handle such un-compression errors? Is it possible that the
> > ByteBuffer from the compressed messages are not released quick enough
> when
> > such errors happen at high rate?
> >
> >
> >
> >
> > On Thu, Jan 15, 2015 at 5:51 PM, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > Not sure what could be going on..
> > >
> > > What version of the client and the broker are you on?
> > >
> > > Can you verify from the state change logs the time it took for
> > > leadership to move to the preferred leader?
> > >
> > > Were there long GCs in your brokers?  Can you also look for zookeeper
> > > session expirations in your broker logs?
> > >
> > > It is odd that restarting your consumers appears to have resolved your
> > > issues. What config overrides did you use for your consumers? E.g.,
> > > did you override the max wait time?
> > >
> > > How many consumers/producers are we talking about here?
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Thu, Jan 15, 2015 at 11:55:35AM -0800, Allen Wang wrote:
> > > > We are using the scala producer. From producer side, we have seen a
> lot
> > > of
> > > > error messages in producer during the time of incoming message drop:
> > > >
> > > > Produce request with correlation id 31616255 failed due to
> > > > [trace_annotation,10]: kafka.common.NotLeaderForPartitionException
> > > >
> > > > And a few (far less than the NotLeaderForPartitionException) of
> those:
> > > >
> > > > 2015-01-15 17:57:54,412 WARN
> KafkaSink-kafka_0_8_logtrace_asg-Sender-2
> > > > DefaultEventHandler - Failed to send producer request with
> correlation id
> > > > 31554484 to broker 20 with
> > > > data for partitions [trace_annotation,9]
> > > > java.net.SocketTimeoutException
> > > >
> > > > What's interesting is that the broker cluster recovered from the
> message
> > > > drop only after we restarted consumers. Also during that time, we
> have
> > > > observed is that the garbage collection time for the brokers
> increased 5
> > > > times. The AllBrokersFetchRequestRateAndTimeMs_9X metric from the
> > > consumer
> > > > side also increased from a few hundred ms to several seconds.
> > > >
> > > > What we don't know is whether the garbage collection time increase
> is the
> > > > cause or the effect of the problem. It seems that after the
> rebalance,
> > > some
> > > > resources in the brokers was tied up and it was only released after
> > > restart
> > > > of consumers.
> > > >
> > > >
> > > > On Thu, Jan 15, 2015 at 8:15 AM, Joel Koshy <jj...@gmail.com>
> wrote:
> > > >
> > > > > > Is leadership rebalance a safe operation?
> > > > >
> > > > > Yes - we use it routinely. For any partition, there should only be
> a
> > > > > brief (order of seconds) period of rejected messages as leaders
> move.
> > > > > When that happens the client should refresh metadata and discover
> the
> > > > > new leader. Are you using the Java producer? Do you see any errors
> in
> > > > > the producer logs?
> > > > >
> > > > > On Wed, Jan 14, 2015 at 06:36:27PM -0800, Allen Wang wrote:
> > > > > > Hello,
> > > > > >
> > > > > > We did a manual leadership rebalance (using
> > > > > > PreferredReplicaLeaderElectionCommand) under heavy load and found
> > > that
> > > > > > there is a significant drop of incoming messages to the broker
> > > cluster
> > > > > for
> > > > > > more than an hour. Looking at broker log, we found a lot of
> errors
> > > like
> > > > > > this:
> > > > > >
> > > > > > 2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
> > > > > > [kafka-processor-7101-0] [error] Closing socket for /
> 10.213.156.41
> > > > > > because of error
> > > > > > java.io.IOException: Connection reset by peer
> > > > > >       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > > > >       at
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > > > >       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > > > > >       at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> > > > > >       at
> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> > > > > >       at kafka.utils.Utils$.read(Utils.scala:375)
> > > > > >       at
> > > > >
> > >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > >       at kafka.network.Processor.read(SocketServer.scala:347)
> > > > > >       at kafka.network.Processor.run(SocketServer.scala:245)
> > > > > >       at java.lang.Thread.run(Thread.java:745)
> > > > > >
> > > > > >
> > > > > > Is leadership rebalance a safe operation?
> > > > > >
> > > > > > Thanks.
> > > > >
> > > > >
> > >
>

Re: Leadership rebalance causing drop of incoming messages

Posted by Joel Koshy <jj...@gmail.com>.
> instability of the broker cluster might have been caused by a snappy
> un-compression error. In our case, the consumer and producer happens to be
> the same application so restarting the client made the recovery of the
> ...
> The un-compression error is likely to be caused by incompatible snappy
> version used by the producer.

It is unclear to me why restarting clients would fix the compatibility
issue. Or do you mean you bounced in the right version of the snappy
jar?

Also, what version of the broker are you on?

> Also, it appears that when the metric FailedProduceRequestsPerSec (which
> captures the above error) raises to a certain level (around 20/s), it will
> start to have impact on the stability of the brokers, including significant
> increase of GC time.
> 
> How does Kafka handle such un-compression errors? Is it possible that the
> ByteBuffer from the compressed messages are not released quick enough when
> such errors happen at high rate?

This is surprising. The only thing I can think of is if everything
gets rejected there will be less IO by virtue of the fact that nothing
gets flushed to disk, but in terms of GC I don't quite see why it
would cause an increase with the errors alone since the broker
decompressed the message-set no matter what in order to assign
offsets. So the byte buffers need to be allocated anyway.

What is the requests per sec that you see without failed requests?

Finally, is this easily reproducible?

Joel


On Wed, Jan 21, 2015 at 11:03:40AM -0800, Allen Wang wrote:
> After a closer look to other metrics and broker logs, we found that the
> instability of the broker cluster might have been caused by a snappy
> un-compression error. In our case, the consumer and producer happens to be
> the same application so restarting the client made the recovery of the
> broker cluster. Here is the error we saw in the broker log:
> 
>   2015-01-19 00:07:24,891 ERROR kafka.utils.Logging$class:103
> [kafka-request-handler-0] [error] [KafkaApi-7] Error processing
> ProducerRequest with correlation id 393323 from client surorouter-mantis on
> partition [logblob-networkerror,0]
> 
> kafka.common.KafkaException: Error in validating messages while appending
> to log 'logblob-networkerror-0'
> 
> at kafka.log.Log.liftedTree1$1(Log.scala:254)
> 
> at kafka.log.Log.append(Log.scala:251)
> 
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
> 
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
> 
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
> 
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> 
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> 
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> 
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> 
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> 
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> 
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> 
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> 
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> 
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> 
> at scala.collection.mutable.HashMap.map(HashMap.scala:45)
> 
> at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
> 
> at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
> 
> at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
> 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> 
> at java.lang.Thread.run(Thread.java:745)
> 
> Caused by: java.io.IOException: failed to uncompress the chunk:
> FAILED_TO_UNCOMPRESS(5)
> 
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
> 
> at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159)
> 
> at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
> 
> at java.io.InputStream.read(InputStream.java:101)
> 
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:68)
> 
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:68)
> 
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:68)
> 
> at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> 
> at
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
> 
> at
> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
> 
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> 
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> 
> at
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
> 
> at
> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
> 
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> 
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> 
> at scala.collection.immutable.Stream.foreach(Stream.scala:527)
> 
> The un-compression error is likely to be caused by incompatible snappy
> version used by the producer.
> 
> Also, it appears that when the metric FailedProduceRequestsPerSec (which
> captures the above error) raises to a certain level (around 20/s), it will
> start to have impact on the stability of the brokers, including significant
> increase of GC time.
> 
> How does Kafka handle such un-compression errors? Is it possible that the
> ByteBuffer from the compressed messages are not released quick enough when
> such errors happen at high rate?
> 
> 
> 
> 
> On Thu, Jan 15, 2015 at 5:51 PM, Joel Koshy <jj...@gmail.com> wrote:
> 
> > Not sure what could be going on..
> >
> > What version of the client and the broker are you on?
> >
> > Can you verify from the state change logs the time it took for
> > leadership to move to the preferred leader?
> >
> > Were there long GCs in your brokers?  Can you also look for zookeeper
> > session expirations in your broker logs?
> >
> > It is odd that restarting your consumers appears to have resolved your
> > issues. What config overrides did you use for your consumers? E.g.,
> > did you override the max wait time?
> >
> > How many consumers/producers are we talking about here?
> >
> > Thanks,
> >
> > Joel
> >
> > On Thu, Jan 15, 2015 at 11:55:35AM -0800, Allen Wang wrote:
> > > We are using the scala producer. From producer side, we have seen a lot
> > of
> > > error messages in producer during the time of incoming message drop:
> > >
> > > Produce request with correlation id 31616255 failed due to
> > > [trace_annotation,10]: kafka.common.NotLeaderForPartitionException
> > >
> > > And a few (far less than the NotLeaderForPartitionException) of those:
> > >
> > > 2015-01-15 17:57:54,412 WARN KafkaSink-kafka_0_8_logtrace_asg-Sender-2
> > > DefaultEventHandler - Failed to send producer request with correlation id
> > > 31554484 to broker 20 with
> > > data for partitions [trace_annotation,9]
> > > java.net.SocketTimeoutException
> > >
> > > What's interesting is that the broker cluster recovered from the message
> > > drop only after we restarted consumers. Also during that time, we have
> > > observed is that the garbage collection time for the brokers increased 5
> > > times. The AllBrokersFetchRequestRateAndTimeMs_9X metric from the
> > consumer
> > > side also increased from a few hundred ms to several seconds.
> > >
> > > What we don't know is whether the garbage collection time increase is the
> > > cause or the effect of the problem. It seems that after the rebalance,
> > some
> > > resources in the brokers was tied up and it was only released after
> > restart
> > > of consumers.
> > >
> > >
> > > On Thu, Jan 15, 2015 at 8:15 AM, Joel Koshy <jj...@gmail.com> wrote:
> > >
> > > > > Is leadership rebalance a safe operation?
> > > >
> > > > Yes - we use it routinely. For any partition, there should only be a
> > > > brief (order of seconds) period of rejected messages as leaders move.
> > > > When that happens the client should refresh metadata and discover the
> > > > new leader. Are you using the Java producer? Do you see any errors in
> > > > the producer logs?
> > > >
> > > > On Wed, Jan 14, 2015 at 06:36:27PM -0800, Allen Wang wrote:
> > > > > Hello,
> > > > >
> > > > > We did a manual leadership rebalance (using
> > > > > PreferredReplicaLeaderElectionCommand) under heavy load and found
> > that
> > > > > there is a significant drop of incoming messages to the broker
> > cluster
> > > > for
> > > > > more than an hour. Looking at broker log, we found a lot of errors
> > like
> > > > > this:
> > > > >
> > > > > 2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
> > > > > [kafka-processor-7101-0] [error] Closing socket for /10.213.156.41
> > > > > because of error
> > > > > java.io.IOException: Connection reset by peer
> > > > >       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > > >       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > > >       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > > > >       at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> > > > >       at
> > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> > > > >       at kafka.utils.Utils$.read(Utils.scala:375)
> > > > >       at
> > > >
> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > >       at kafka.network.Processor.read(SocketServer.scala:347)
> > > > >       at kafka.network.Processor.run(SocketServer.scala:245)
> > > > >       at java.lang.Thread.run(Thread.java:745)
> > > > >
> > > > >
> > > > > Is leadership rebalance a safe operation?
> > > > >
> > > > > Thanks.
> > > >
> > > >
> >

Re: Leadership rebalance causing drop of incoming messages

Posted by Allen Wang <aw...@netflix.com.INVALID>.
After a closer look to other metrics and broker logs, we found that the
instability of the broker cluster might have been caused by a snappy
un-compression error. In our case, the consumer and producer happens to be
the same application so restarting the client made the recovery of the
broker cluster. Here is the error we saw in the broker log:

  2015-01-19 00:07:24,891 ERROR kafka.utils.Logging$class:103
[kafka-request-handler-0] [error] [KafkaApi-7] Error processing
ProducerRequest with correlation id 393323 from client surorouter-mantis on
partition [logblob-networkerror,0]

kafka.common.KafkaException: Error in validating messages while appending
to log 'logblob-networkerror-0'

at kafka.log.Log.liftedTree1$1(Log.scala:254)

at kafka.log.Log.append(Log.scala:251)

at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)

at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)

at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)

at scala.collection.Iterator$class.foreach(Iterator.scala:772)

at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)

at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)

at scala.collection.mutable.HashMap.map(HashMap.scala:45)

at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)

at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)

at kafka.server.KafkaApis.handle(KafkaApis.scala:185)

at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.IOException: failed to uncompress the chunk:
FAILED_TO_UNCOMPRESS(5)

at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)

at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159)

at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)

at java.io.InputStream.read(InputStream.java:101)

at
kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:68)

at
kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:68)

at
kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:68)

at scala.collection.immutable.Stream$.continually(Stream.scala:1104)

at
scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)

at
scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)

at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)

at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)

at
scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)

at
scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)

at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)

at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)

at scala.collection.immutable.Stream.foreach(Stream.scala:527)

The un-compression error is likely to be caused by incompatible snappy
version used by the producer.

Also, it appears that when the metric FailedProduceRequestsPerSec (which
captures the above error) raises to a certain level (around 20/s), it will
start to have impact on the stability of the brokers, including significant
increase of GC time.

How does Kafka handle such un-compression errors? Is it possible that the
ByteBuffer from the compressed messages are not released quick enough when
such errors happen at high rate?




On Thu, Jan 15, 2015 at 5:51 PM, Joel Koshy <jj...@gmail.com> wrote:

> Not sure what could be going on..
>
> What version of the client and the broker are you on?
>
> Can you verify from the state change logs the time it took for
> leadership to move to the preferred leader?
>
> Were there long GCs in your brokers?  Can you also look for zookeeper
> session expirations in your broker logs?
>
> It is odd that restarting your consumers appears to have resolved your
> issues. What config overrides did you use for your consumers? E.g.,
> did you override the max wait time?
>
> How many consumers/producers are we talking about here?
>
> Thanks,
>
> Joel
>
> On Thu, Jan 15, 2015 at 11:55:35AM -0800, Allen Wang wrote:
> > We are using the scala producer. From producer side, we have seen a lot
> of
> > error messages in producer during the time of incoming message drop:
> >
> > Produce request with correlation id 31616255 failed due to
> > [trace_annotation,10]: kafka.common.NotLeaderForPartitionException
> >
> > And a few (far less than the NotLeaderForPartitionException) of those:
> >
> > 2015-01-15 17:57:54,412 WARN KafkaSink-kafka_0_8_logtrace_asg-Sender-2
> > DefaultEventHandler - Failed to send producer request with correlation id
> > 31554484 to broker 20 with
> > data for partitions [trace_annotation,9]
> > java.net.SocketTimeoutException
> >
> > What's interesting is that the broker cluster recovered from the message
> > drop only after we restarted consumers. Also during that time, we have
> > observed is that the garbage collection time for the brokers increased 5
> > times. The AllBrokersFetchRequestRateAndTimeMs_9X metric from the
> consumer
> > side also increased from a few hundred ms to several seconds.
> >
> > What we don't know is whether the garbage collection time increase is the
> > cause or the effect of the problem. It seems that after the rebalance,
> some
> > resources in the brokers was tied up and it was only released after
> restart
> > of consumers.
> >
> >
> > On Thu, Jan 15, 2015 at 8:15 AM, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > > Is leadership rebalance a safe operation?
> > >
> > > Yes - we use it routinely. For any partition, there should only be a
> > > brief (order of seconds) period of rejected messages as leaders move.
> > > When that happens the client should refresh metadata and discover the
> > > new leader. Are you using the Java producer? Do you see any errors in
> > > the producer logs?
> > >
> > > On Wed, Jan 14, 2015 at 06:36:27PM -0800, Allen Wang wrote:
> > > > Hello,
> > > >
> > > > We did a manual leadership rebalance (using
> > > > PreferredReplicaLeaderElectionCommand) under heavy load and found
> that
> > > > there is a significant drop of incoming messages to the broker
> cluster
> > > for
> > > > more than an hour. Looking at broker log, we found a lot of errors
> like
> > > > this:
> > > >
> > > > 2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
> > > > [kafka-processor-7101-0] [error] Closing socket for /10.213.156.41
> > > > because of error
> > > > java.io.IOException: Connection reset by peer
> > > >       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > >       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > >       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > > >       at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> > > >       at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> > > >       at kafka.utils.Utils$.read(Utils.scala:375)
> > > >       at
> > >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > >       at kafka.network.Processor.read(SocketServer.scala:347)
> > > >       at kafka.network.Processor.run(SocketServer.scala:245)
> > > >       at java.lang.Thread.run(Thread.java:745)
> > > >
> > > >
> > > > Is leadership rebalance a safe operation?
> > > >
> > > > Thanks.
> > >
> > >
>
>

Re: Leadership rebalance causing drop of incoming messages

Posted by Joel Koshy <jj...@gmail.com>.
Not sure what could be going on..

What version of the client and the broker are you on?

Can you verify from the state change logs the time it took for
leadership to move to the preferred leader?

Were there long GCs in your brokers?  Can you also look for zookeeper
session expirations in your broker logs?

It is odd that restarting your consumers appears to have resolved your
issues. What config overrides did you use for your consumers? E.g.,
did you override the max wait time?

How many consumers/producers are we talking about here?

Thanks,

Joel

On Thu, Jan 15, 2015 at 11:55:35AM -0800, Allen Wang wrote:
> We are using the scala producer. From producer side, we have seen a lot of
> error messages in producer during the time of incoming message drop:
> 
> Produce request with correlation id 31616255 failed due to
> [trace_annotation,10]: kafka.common.NotLeaderForPartitionException
> 
> And a few (far less than the NotLeaderForPartitionException) of those:
> 
> 2015-01-15 17:57:54,412 WARN KafkaSink-kafka_0_8_logtrace_asg-Sender-2
> DefaultEventHandler - Failed to send producer request with correlation id
> 31554484 to broker 20 with
> data for partitions [trace_annotation,9]
> java.net.SocketTimeoutException
> 
> What's interesting is that the broker cluster recovered from the message
> drop only after we restarted consumers. Also during that time, we have
> observed is that the garbage collection time for the brokers increased 5
> times. The AllBrokersFetchRequestRateAndTimeMs_9X metric from the consumer
> side also increased from a few hundred ms to several seconds.
> 
> What we don't know is whether the garbage collection time increase is the
> cause or the effect of the problem. It seems that after the rebalance, some
> resources in the brokers was tied up and it was only released after restart
> of consumers.
> 
> 
> On Thu, Jan 15, 2015 at 8:15 AM, Joel Koshy <jj...@gmail.com> wrote:
> 
> > > Is leadership rebalance a safe operation?
> >
> > Yes - we use it routinely. For any partition, there should only be a
> > brief (order of seconds) period of rejected messages as leaders move.
> > When that happens the client should refresh metadata and discover the
> > new leader. Are you using the Java producer? Do you see any errors in
> > the producer logs?
> >
> > On Wed, Jan 14, 2015 at 06:36:27PM -0800, Allen Wang wrote:
> > > Hello,
> > >
> > > We did a manual leadership rebalance (using
> > > PreferredReplicaLeaderElectionCommand) under heavy load and found that
> > > there is a significant drop of incoming messages to the broker cluster
> > for
> > > more than an hour. Looking at broker log, we found a lot of errors like
> > > this:
> > >
> > > 2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
> > > [kafka-processor-7101-0] [error] Closing socket for /10.213.156.41
> > > because of error
> > > java.io.IOException: Connection reset by peer
> > >       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > >       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > >       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > >       at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> > >       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> > >       at kafka.utils.Utils$.read(Utils.scala:375)
> > >       at
> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > >       at kafka.network.Processor.read(SocketServer.scala:347)
> > >       at kafka.network.Processor.run(SocketServer.scala:245)
> > >       at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > Is leadership rebalance a safe operation?
> > >
> > > Thanks.
> >
> >


Re: Leadership rebalance causing drop of incoming messages

Posted by Allen Wang <aw...@netflix.com.INVALID>.
Another kind of error messages is found in the kafka state change log after
leadership rebalance:

2015-01-15 00:01:39,895 WARN  kafka.utils.Logging$class:83
[kafka-request-handler-0] [warn] Broker 8 received invalid
LeaderAndIsr request with correlation id 221 from controller 0 epoch
19 with an older leader epoch 18 for partition [mapcommandaudit,4],
current leader epoch is 18



On Thu, Jan 15, 2015 at 11:55 AM, Allen Wang <aw...@netflix.com> wrote:

> We are using the scala producer. From producer side, we have seen a lot of
> error messages in producer during the time of incoming message drop:
>
> Produce request with correlation id 31616255 failed due to
> [trace_annotation,10]: kafka.common.NotLeaderForPartitionException
>
> And a few (far less than the NotLeaderForPartitionException) of those:
>
> 2015-01-15 17:57:54,412 WARN KafkaSink-kafka_0_8_logtrace_asg-Sender-2
> DefaultEventHandler - Failed to send producer request with correlation id
> 31554484 to broker 20 with
> data for partitions [trace_annotation,9]
> java.net.SocketTimeoutException
>
> What's interesting is that the broker cluster recovered from the message
> drop only after we restarted consumers. Also during that time, we have
> observed is that the garbage collection time for the brokers increased 5
> times. The AllBrokersFetchRequestRateAndTimeMs_9X metric from the consumer
> side also increased from a few hundred ms to several seconds.
>
> What we don't know is whether the garbage collection time increase is the
> cause or the effect of the problem. It seems that after the rebalance, some
> resources in the brokers was tied up and it was only released after restart
> of consumers.
>
>
> On Thu, Jan 15, 2015 at 8:15 AM, Joel Koshy <jj...@gmail.com> wrote:
>
>> > Is leadership rebalance a safe operation?
>>
>> Yes - we use it routinely. For any partition, there should only be a
>> brief (order of seconds) period of rejected messages as leaders move.
>> When that happens the client should refresh metadata and discover the
>> new leader. Are you using the Java producer? Do you see any errors in
>> the producer logs?
>>
>> On Wed, Jan 14, 2015 at 06:36:27PM -0800, Allen Wang wrote:
>> > Hello,
>> >
>> > We did a manual leadership rebalance (using
>> > PreferredReplicaLeaderElectionCommand) under heavy load and found that
>> > there is a significant drop of incoming messages to the broker cluster
>> for
>> > more than an hour. Looking at broker log, we found a lot of errors like
>> > this:
>> >
>> > 2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
>> > [kafka-processor-7101-0] [error] Closing socket for /10.213.156.41
>> > because of error
>> > java.io.IOException: Connection reset by peer
>> >       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> >       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> >       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>> >       at sun.nio.ch.IOUtil.read(IOUtil.java:197)
>> >       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>> >       at kafka.utils.Utils$.read(Utils.scala:375)
>> >       at
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>> >       at kafka.network.Processor.read(SocketServer.scala:347)
>> >       at kafka.network.Processor.run(SocketServer.scala:245)
>> >       at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > Is leadership rebalance a safe operation?
>> >
>> > Thanks.
>>
>>
>

Re: Leadership rebalance causing drop of incoming messages

Posted by Allen Wang <aw...@netflix.com.INVALID>.
We are using the scala producer. From producer side, we have seen a lot of
error messages in producer during the time of incoming message drop:

Produce request with correlation id 31616255 failed due to
[trace_annotation,10]: kafka.common.NotLeaderForPartitionException

And a few (far less than the NotLeaderForPartitionException) of those:

2015-01-15 17:57:54,412 WARN KafkaSink-kafka_0_8_logtrace_asg-Sender-2
DefaultEventHandler - Failed to send producer request with correlation id
31554484 to broker 20 with
data for partitions [trace_annotation,9]
java.net.SocketTimeoutException

What's interesting is that the broker cluster recovered from the message
drop only after we restarted consumers. Also during that time, we have
observed is that the garbage collection time for the brokers increased 5
times. The AllBrokersFetchRequestRateAndTimeMs_9X metric from the consumer
side also increased from a few hundred ms to several seconds.

What we don't know is whether the garbage collection time increase is the
cause or the effect of the problem. It seems that after the rebalance, some
resources in the brokers was tied up and it was only released after restart
of consumers.


On Thu, Jan 15, 2015 at 8:15 AM, Joel Koshy <jj...@gmail.com> wrote:

> > Is leadership rebalance a safe operation?
>
> Yes - we use it routinely. For any partition, there should only be a
> brief (order of seconds) period of rejected messages as leaders move.
> When that happens the client should refresh metadata and discover the
> new leader. Are you using the Java producer? Do you see any errors in
> the producer logs?
>
> On Wed, Jan 14, 2015 at 06:36:27PM -0800, Allen Wang wrote:
> > Hello,
> >
> > We did a manual leadership rebalance (using
> > PreferredReplicaLeaderElectionCommand) under heavy load and found that
> > there is a significant drop of incoming messages to the broker cluster
> for
> > more than an hour. Looking at broker log, we found a lot of errors like
> > this:
> >
> > 2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
> > [kafka-processor-7101-0] [error] Closing socket for /10.213.156.41
> > because of error
> > java.io.IOException: Connection reset by peer
> >       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> >       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> >       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> >       at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> >       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> >       at kafka.utils.Utils$.read(Utils.scala:375)
> >       at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >       at kafka.network.Processor.read(SocketServer.scala:347)
> >       at kafka.network.Processor.run(SocketServer.scala:245)
> >       at java.lang.Thread.run(Thread.java:745)
> >
> >
> > Is leadership rebalance a safe operation?
> >
> > Thanks.
>
>

Re: Leadership rebalance causing drop of incoming messages

Posted by Joel Koshy <jj...@gmail.com>.
> Is leadership rebalance a safe operation?

Yes - we use it routinely. For any partition, there should only be a
brief (order of seconds) period of rejected messages as leaders move.
When that happens the client should refresh metadata and discover the
new leader. Are you using the Java producer? Do you see any errors in
the producer logs?

On Wed, Jan 14, 2015 at 06:36:27PM -0800, Allen Wang wrote:
> Hello,
> 
> We did a manual leadership rebalance (using
> PreferredReplicaLeaderElectionCommand) under heavy load and found that
> there is a significant drop of incoming messages to the broker cluster for
> more than an hour. Looking at broker log, we found a lot of errors like
> this:
> 
> 2015-01-15 00:00:03,330 ERROR kafka.utils.Logging$class:103
> [kafka-processor-7101-0] [error] Closing socket for /10.213.156.41
> because of error
> java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> 	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> 	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> 	at kafka.utils.Utils$.read(Utils.scala:375)
> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> 	at kafka.network.Processor.read(SocketServer.scala:347)
> 	at kafka.network.Processor.run(SocketServer.scala:245)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> 
> Is leadership rebalance a safe operation?
> 
> Thanks.