You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rajiv Kurian <ra...@signalfuse.com> on 2014/11/22 20:23:48 UTC

Kafka restart takes a long time

A 3 node kafka broker cluster went down yesterday (all nodes) and I just
noticed it this morning. When I restarted it this morning, I see a lengthy
list of messages like this:

Loading log 'mytopic-partitionNum"
Recovering unflushed segment 'some number' of in log mytopic-partitionNum.
Completed load of log mytopic-partitionNum with log end offset someOffset

It's been going on for more than 30 minutes since I restarted the broker. I
have quite a few partitions (over 1000) but I still wouldn't expect it to
take such a long time.

Any ideas on how I should investigate the problem?

Thanks!

Re: Kafka restart takes a long time

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Thanks every one. I"ll try to clean up the disk space and try again.

On Sun, Nov 23, 2014 at 8:47 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Rajiv,
>
> So, any time a broker's disk fills up, it will shut itself down immediately
> (it will do this in response to any IO error on writing to disk).
> Unfortunately, this means that the node will not be able to do any
> housecleaning before shutdown, which is an 'unclean' shutdown.  This means
> that when it restarts, it needs to reset the data to the last known
> checkpoint.  If the partition is replicated, and it can restore it from
> another broker, it will try to do that (but it doesn't sound like it can do
> that in your case, since all the other nodes are down too).
>
> There is a fix coming in 0.8.2 that will allow a broker to restore multiple
> partitions in parallel (but the current behavior in 0.8.1.1 and prior is to
> restore partitions 1 by 1). See:
> https://issues.apache.org/jira/browse/KAFKA-1414.  This fix should speed
> things up greatly when you have a large number of partitions.
>
> If a disk is full, the broker will refuse to even start up (or will fail
> immediately on the first write attempt and shut itself down).  So,
> generally, in this event, you need to clear some disk space before trying
> to restart the server.
>
> The bottom line is that you don't want any of your brokers to run out of
> disk space (thus you need to have good monitoring/alerting for advance
> warning on this).  Kafka doesn't attempt to detect if it's about to run out
> of space and die, so you have to manage that and guard against it outside
> of kafka.
>
> Jason
>
> On Sat, Nov 22, 2014 at 5:27 PM, Harsha <ka...@harsha.io> wrote:
>
> > It might logs check your kafka logs dir (server logs) . Kafka can
> > produce lot of logs in a quick time make sure thats whats in play here.
> > -Harsha
> >
> > On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote:
> > > Actually see a bunch of errors. One of the brokers is out of space and
> > > this
> > > might be causing everything to spin out of control.
> > >
> > > Some logs:
> > >
> > > On *broker 1* (the one that has run out of space):
> > >
> > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13          ]
> > > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13],
> Disk
> > > error while replicating data.
> > >
> > > kafka.common.KafkaStorageException: I/O exception in append to log
> > > 'mytopic-633'
> > >
> > >         at kafka.log.Log.append(Log.scala:283)
> > >
> > >         at
> > >
> >
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> > >
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> > >
> > >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> > >
> > >         at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > >
> > >         at
> > >         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > > Caused by: java.io.IOException: No space left on device
> > >
> > >         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> > >
> > >         at
> > >         sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
> > >
> > >         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> > >
> > >         at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> > >
> > >         at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
> > >
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)
> > >
> > >         at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> > >
> > >         at kafka.log.LogSegment.append(LogSegment.scala:80)
> > >
> > >         at kafka.log.Log.append(Log.scala:269)
> > >
> > >         ... 13 more
> > >
> > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13          ]
> > > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-2-13],
> > > Error
> > > getting offset for partition [myTopic,0] to broker 13
> > >
> > > java.io.IOException: No space left on device
> > >
> > >         at java.io.FileOutputStream.writeBytes(Native Method)
> > >
> > >         at java.io.FileOutputStream.write(FileOutputStream.java:345)
> > >
> > >         at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> > >
> > >         at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> > >
> > >         at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> > >
> > >         at
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
> > >
> > >         at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)
> > >
> > >         at java.io.BufferedWriter.write(BufferedWriter.java:230)
> > >
> > >         at java.io.Writer.write(Writer.java:157)
> > >
> > >         at
> > >
> >
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50)
> > >
> > >         at
> > >
> >
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49)
> > >
> > >         at
> > >
> >
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> > >
> > >         at
> > >
> >
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> > >
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > > scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106)
> > >
> > >         at
> > > kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49)
> > >
> > >         at
> kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39)
> > >
> > >         at
> > >
> >
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234)
> > >
> > >         at
> > >
> >
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231)
> > >
> > >         at
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >
> > >         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >
> > >         at
> > >
> kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:231)
> > >
> > >         at kafka.log.LogManager.truncateTo(LogManager.scala:204)
> > >
> > >         at
> > >
> >
> kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:84)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:144)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> > >
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> > >
> > >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> > >
> > >         at
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> > >
> > >         at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > >
> > >         at
> > >         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > >
> > > *On broker 2:*
> > >
> > > 2014-11-22T21:20:19.629Z ERROR [request-expiration-task            ]
> > > [kafka.server.KafkaApis              ]: [KafkaApi-12] Error when
> > > processing
> > > fetch request for partition [myTopic,265] offset 415659 from follower
> > > with
> > > correlation id 0
> > >
> > > kafka.common.OffsetOutOfRangeException: Request for offset 415659 but
> we
> > > only have log segments in the range 0 to 410453.
> > >
> > >         at kafka.log.Log.read(Log.scala:377)
> > >
> > >         at
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:401)
> > >
> > >         at
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:347)
> > >
> > >         at
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:343)
> > >
> > >         at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >
> > >         at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > >         at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > >
> > >         at scala.collection.immutable.HashMap.map(HashMap.scala:35)
> > >
> > >         at
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:343)
> > >
> > >         at
> > >
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:704)
> > >
> > >         at
> > >
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:686)
> > >
> > >         at
> > >
> >
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> > >
> > >
> > > *On broker 3*:
> > >
> > >
> > > 2014-11-22T21:26:48.216Z INFO  [kafka-request-handler-3            ]
> > > [fka.controller.PartitionStateMachine]: [Partition state machine on
> > > Controller 13]: Invoking state change to OnlinePartition for partitions
> > > [myTopic,122]
> > >
> > > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> > > [state.change.logger                 ]: Controller 13 epoch 132
> > > encountered
> > > error while electing leader for partition [myTopic,122] due to: No
> other
> > > replicas in ISR 13 for [myTopic,122] besides shutting down brokers 13.
> > >
> > > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> > > [state.change.logger                 ]: Controller 13 epoch 132
> initiated
> > > state change for partition [myTopic,122] from OnlinePartition to
> > > OnlinePartition failed
> > >
> > > kafka.common.StateChangeFailedException: encountered error while
> electing
> > > leader for partition [myTopic,122] due to: No other replicas in ISR 13
> > > for
> > > [myTopic,122] besides shutting down brokers 13.
> > >
> > >         at
> > >
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:344)
> > >
> > >         at
> > >
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:171)
> > >
> > >         at
> > >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
> > >
> > >         at
> > >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
> > >
> > >         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> > >
> > >         at
> > >
> >
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
> > >
> > >         at
> > >
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:227)
> > >
> > >         at
> > >
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:223)
> > >
> > >         at scala.Option.foreach(Option.scala:121)
> > >
> > >         at
> > >
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:223)
> > >
> > >         at
> > >
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:219)
> > >
> > >         at
> > > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
> > >
> > >         at
> > >
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> > >
> > >         at
> > >
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> > >
> > >         at
> > >
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> > >
> > >         at
> > >
> >
> kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:219)
> > >
> > >         at
> > >
> >
> kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:140)
> > >
> > >         at kafka.server.KafkaApis.handle(KafkaApis.scala:77)
> > >
> > >         at
> > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > >
> > >         at java.lang.Thread.run(Thread.java:744)
> > >
> > > Caused by: kafka.common.StateChangeFailedException: No other replicas
> in
> > > ISR 13 for [myTopic,122] besides shutting down brokers 13
> > >
> > >         at
> > >
> >
> kafka.controller.ControlledShutdownLeaderSelector.selectLeader(PartitionLeaderSelector.scala:181)
> > >
> > >         at
> > >
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:320)
> > >
> > >         ... 19 more
> > >
> > > On Sat, Nov 22, 2014 at 1:17 PM, Harsha <ka...@harsha.io> wrote:
> > >
> > > > Rajiv ,
> > > >           which version of kafka are you using and do you see any
> > errors
> > > >           when the server goes down after sending few messages.
> > > > -Harsha
> > > >
> > > > On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote:
> > > > > The brokers also seem unavailable while this is going on.  Each of
> > these
> > > > > log messages takes 2-3 seconds so at about  1200 partitions it
> takes
> > up
> > > > > quite a bit of time. Ultimately it does recover though but sadly it
> > goes
> > > > > down soon enough after I start sending it messages.
> > > > >
> > > > > On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian <
> rajiv@signalfuse.com
> > >
> > > > > wrote:
> > > > >
> > > > > > A 3 node kafka broker cluster went down yesterday (all nodes)
> and I
> > > > just
> > > > > > noticed it this morning. When I restarted it this morning, I see
> a
> > > > lengthy
> > > > > > list of messages like this:
> > > > > >
> > > > > > Loading log 'mytopic-partitionNum"
> > > > > > Recovering unflushed segment 'some number' of in log
> > > > mytopic-partitionNum.
> > > > > > Completed load of log mytopic-partitionNum with log end offset
> > > > someOffset
> > > > > >
> > > > > > It's been going on for more than 30 minutes since I restarted the
> > > > broker.
> > > > > > I have quite a few partitions (over 1000) but I still wouldn't
> > expect
> > > > it to
> > > > > > take such a long time.
> > > > > >
> > > > > > Any ideas on how I should investigate the problem?
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > >
> >
>

Re: Kafka restart takes a long time

Posted by Jason Rosenberg <jb...@squareup.com>.
Rajiv,

So, any time a broker's disk fills up, it will shut itself down immediately
(it will do this in response to any IO error on writing to disk).
Unfortunately, this means that the node will not be able to do any
housecleaning before shutdown, which is an 'unclean' shutdown.  This means
that when it restarts, it needs to reset the data to the last known
checkpoint.  If the partition is replicated, and it can restore it from
another broker, it will try to do that (but it doesn't sound like it can do
that in your case, since all the other nodes are down too).

There is a fix coming in 0.8.2 that will allow a broker to restore multiple
partitions in parallel (but the current behavior in 0.8.1.1 and prior is to
restore partitions 1 by 1). See:
https://issues.apache.org/jira/browse/KAFKA-1414.  This fix should speed
things up greatly when you have a large number of partitions.

If a disk is full, the broker will refuse to even start up (or will fail
immediately on the first write attempt and shut itself down).  So,
generally, in this event, you need to clear some disk space before trying
to restart the server.

The bottom line is that you don't want any of your brokers to run out of
disk space (thus you need to have good monitoring/alerting for advance
warning on this).  Kafka doesn't attempt to detect if it's about to run out
of space and die, so you have to manage that and guard against it outside
of kafka.

Jason

On Sat, Nov 22, 2014 at 5:27 PM, Harsha <ka...@harsha.io> wrote:

> It might logs check your kafka logs dir (server logs) . Kafka can
> produce lot of logs in a quick time make sure thats whats in play here.
> -Harsha
>
> On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote:
> > Actually see a bunch of errors. One of the brokers is out of space and
> > this
> > might be causing everything to spin out of control.
> >
> > Some logs:
> >
> > On *broker 1* (the one that has run out of space):
> >
> > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13          ]
> > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13], Disk
> > error while replicating data.
> >
> > kafka.common.KafkaStorageException: I/O exception in append to log
> > 'mytopic-633'
> >
> >         at kafka.log.Log.append(Log.scala:283)
> >
> >         at
> >
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> >         at
> >         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Caused by: java.io.IOException: No space left on device
> >
> >         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> >
> >         at
> >         sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
> >
> >         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> >
> >         at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> >
> >         at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
> >
> >         at
> >
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)
> >
> >         at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> >
> >         at kafka.log.LogSegment.append(LogSegment.scala:80)
> >
> >         at kafka.log.Log.append(Log.scala:269)
> >
> >         ... 13 more
> >
> > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13          ]
> > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-2-13],
> > Error
> > getting offset for partition [myTopic,0] to broker 13
> >
> > java.io.IOException: No space left on device
> >
> >         at java.io.FileOutputStream.writeBytes(Native Method)
> >
> >         at java.io.FileOutputStream.write(FileOutputStream.java:345)
> >
> >         at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> >
> >         at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> >
> >         at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> >
> >         at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
> >
> >         at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)
> >
> >         at java.io.BufferedWriter.write(BufferedWriter.java:230)
> >
> >         at java.io.Writer.write(Writer.java:157)
> >
> >         at
> >
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50)
> >
> >         at
> >
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49)
> >
> >         at
> >
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> >
> >         at
> >
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106)
> >
> >         at
> > kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49)
> >
> >         at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39)
> >
> >         at
> >
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234)
> >
> >         at
> >
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231)
> >
> >         at
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >
> >         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >
> >         at
> > kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:231)
> >
> >         at kafka.log.LogManager.truncateTo(LogManager.scala:204)
> >
> >         at
> >
> kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:84)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:144)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> >         at
> >         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > *On broker 2:*
> >
> > 2014-11-22T21:20:19.629Z ERROR [request-expiration-task            ]
> > [kafka.server.KafkaApis              ]: [KafkaApi-12] Error when
> > processing
> > fetch request for partition [myTopic,265] offset 415659 from follower
> > with
> > correlation id 0
> >
> > kafka.common.OffsetOutOfRangeException: Request for offset 415659 but we
> > only have log segments in the range 0 to 410453.
> >
> >         at kafka.log.Log.read(Log.scala:377)
> >
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:401)
> >
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:347)
> >
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:343)
> >
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> >
> >         at scala.collection.immutable.HashMap.map(HashMap.scala:35)
> >
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:343)
> >
> >         at
> > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:704)
> >
> >         at
> > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:686)
> >
> >         at
> >
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> >
> >
> > *On broker 3*:
> >
> >
> > 2014-11-22T21:26:48.216Z INFO  [kafka-request-handler-3            ]
> > [fka.controller.PartitionStateMachine]: [Partition state machine on
> > Controller 13]: Invoking state change to OnlinePartition for partitions
> > [myTopic,122]
> >
> > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> > [state.change.logger                 ]: Controller 13 epoch 132
> > encountered
> > error while electing leader for partition [myTopic,122] due to: No other
> > replicas in ISR 13 for [myTopic,122] besides shutting down brokers 13.
> >
> > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> > [state.change.logger                 ]: Controller 13 epoch 132 initiated
> > state change for partition [myTopic,122] from OnlinePartition to
> > OnlinePartition failed
> >
> > kafka.common.StateChangeFailedException: encountered error while electing
> > leader for partition [myTopic,122] due to: No other replicas in ISR 13
> > for
> > [myTopic,122] besides shutting down brokers 13.
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:344)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:171)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
> >
> >         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:227)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:223)
> >
> >         at scala.Option.foreach(Option.scala:121)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:223)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:219)
> >
> >         at
> > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
> >
> >         at
> > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> >
> >         at
> > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> >
> >         at
> > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> >
> >         at
> >
> kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:219)
> >
> >         at
> >
> kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:140)
> >
> >         at kafka.server.KafkaApis.handle(KafkaApis.scala:77)
> >
> >         at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> >
> >         at java.lang.Thread.run(Thread.java:744)
> >
> > Caused by: kafka.common.StateChangeFailedException: No other replicas in
> > ISR 13 for [myTopic,122] besides shutting down brokers 13
> >
> >         at
> >
> kafka.controller.ControlledShutdownLeaderSelector.selectLeader(PartitionLeaderSelector.scala:181)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:320)
> >
> >         ... 19 more
> >
> > On Sat, Nov 22, 2014 at 1:17 PM, Harsha <ka...@harsha.io> wrote:
> >
> > > Rajiv ,
> > >           which version of kafka are you using and do you see any
> errors
> > >           when the server goes down after sending few messages.
> > > -Harsha
> > >
> > > On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote:
> > > > The brokers also seem unavailable while this is going on.  Each of
> these
> > > > log messages takes 2-3 seconds so at about  1200 partitions it takes
> up
> > > > quite a bit of time. Ultimately it does recover though but sadly it
> goes
> > > > down soon enough after I start sending it messages.
> > > >
> > > > On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian <rajiv@signalfuse.com
> >
> > > > wrote:
> > > >
> > > > > A 3 node kafka broker cluster went down yesterday (all nodes) and I
> > > just
> > > > > noticed it this morning. When I restarted it this morning, I see a
> > > lengthy
> > > > > list of messages like this:
> > > > >
> > > > > Loading log 'mytopic-partitionNum"
> > > > > Recovering unflushed segment 'some number' of in log
> > > mytopic-partitionNum.
> > > > > Completed load of log mytopic-partitionNum with log end offset
> > > someOffset
> > > > >
> > > > > It's been going on for more than 30 minutes since I restarted the
> > > broker.
> > > > > I have quite a few partitions (over 1000) but I still wouldn't
> expect
> > > it to
> > > > > take such a long time.
> > > > >
> > > > > Any ideas on how I should investigate the problem?
> > > > >
> > > > > Thanks!
> > > > >
> > >
>

Re: Kafka restart takes a long time

Posted by Harsha <ka...@harsha.io>.
It might logs check your kafka logs dir (server logs) . Kafka can
produce lot of logs in a quick time make sure thats whats in play here.
-Harsha

On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote:
> Actually see a bunch of errors. One of the brokers is out of space and
> this
> might be causing everything to spin out of control.
> 
> Some logs:
> 
> On *broker 1* (the one that has run out of space):
> 
> 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13          ]
> [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13], Disk
> error while replicating data.
> 
> kafka.common.KafkaStorageException: I/O exception in append to log
> 'mytopic-633'
> 
>         at kafka.log.Log.append(Log.scala:283)
> 
>         at
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> 
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> 
>         at kafka.utils.Utils$.inLock(Utils.scala:538)
> 
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> 
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> 
>         at
>         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 
> Caused by: java.io.IOException: No space left on device
> 
>         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> 
>         at
>         sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
> 
>         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> 
>         at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> 
>         at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
> 
>         at
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)
> 
>         at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> 
>         at kafka.log.LogSegment.append(LogSegment.scala:80)
> 
>         at kafka.log.Log.append(Log.scala:269)
> 
>         ... 13 more
> 
> 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13          ]
> [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-2-13],
> Error
> getting offset for partition [myTopic,0] to broker 13
> 
> java.io.IOException: No space left on device
> 
>         at java.io.FileOutputStream.writeBytes(Native Method)
> 
>         at java.io.FileOutputStream.write(FileOutputStream.java:345)
> 
>         at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> 
>         at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> 
>         at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> 
>         at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
> 
>         at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)
> 
>         at java.io.BufferedWriter.write(BufferedWriter.java:230)
> 
>         at java.io.Writer.write(Writer.java:157)
> 
>         at
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50)
> 
>         at
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49)
> 
>         at
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> 
>         at
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> 
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106)
> 
>         at
> kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49)
> 
>         at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39)
> 
>         at
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234)
> 
>         at
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231)
> 
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 
>         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 
>         at
> kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:231)
> 
>         at kafka.log.LogManager.truncateTo(LogManager.scala:204)
> 
>         at
> kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:84)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:144)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> 
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> 
>         at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> 
>         at kafka.utils.Utils$.inLock(Utils.scala:538)
> 
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> 
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> 
>         at
>         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 
> 
> *On broker 2:*
> 
> 2014-11-22T21:20:19.629Z ERROR [request-expiration-task            ]
> [kafka.server.KafkaApis              ]: [KafkaApi-12] Error when
> processing
> fetch request for partition [myTopic,265] offset 415659 from follower
> with
> correlation id 0
> 
> kafka.common.OffsetOutOfRangeException: Request for offset 415659 but we
> only have log segments in the range 0 to 410453.
> 
>         at kafka.log.Log.read(Log.scala:377)
> 
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:401)
> 
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:347)
> 
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:343)
> 
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 
>         at scala.collection.immutable.HashMap.map(HashMap.scala:35)
> 
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:343)
> 
>         at
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:704)
> 
>         at
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:686)
> 
>         at
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> 
> 
> *On broker 3*:
> 
> 
> 2014-11-22T21:26:48.216Z INFO  [kafka-request-handler-3            ]
> [fka.controller.PartitionStateMachine]: [Partition state machine on
> Controller 13]: Invoking state change to OnlinePartition for partitions
> [myTopic,122]
> 
> 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> [state.change.logger                 ]: Controller 13 epoch 132
> encountered
> error while electing leader for partition [myTopic,122] due to: No other
> replicas in ISR 13 for [myTopic,122] besides shutting down brokers 13.
> 
> 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> [state.change.logger                 ]: Controller 13 epoch 132 initiated
> state change for partition [myTopic,122] from OnlinePartition to
> OnlinePartition failed
> 
> kafka.common.StateChangeFailedException: encountered error while electing
> leader for partition [myTopic,122] due to: No other replicas in ISR 13
> for
> [myTopic,122] besides shutting down brokers 13.
> 
>         at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:344)
> 
>         at
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:171)
> 
>         at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
> 
>         at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
> 
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> 
>         at
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
> 
>         at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:227)
> 
>         at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:223)
> 
>         at scala.Option.foreach(Option.scala:121)
> 
>         at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:223)
> 
>         at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:219)
> 
>         at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
> 
>         at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> 
>         at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> 
>         at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> 
>         at
> kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:219)
> 
>         at
> kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:140)
> 
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:77)
> 
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> 
>         at java.lang.Thread.run(Thread.java:744)
> 
> Caused by: kafka.common.StateChangeFailedException: No other replicas in
> ISR 13 for [myTopic,122] besides shutting down brokers 13
> 
>         at
> kafka.controller.ControlledShutdownLeaderSelector.selectLeader(PartitionLeaderSelector.scala:181)
> 
>         at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:320)
> 
>         ... 19 more
> 
> On Sat, Nov 22, 2014 at 1:17 PM, Harsha <ka...@harsha.io> wrote:
> 
> > Rajiv ,
> >           which version of kafka are you using and do you see any errors
> >           when the server goes down after sending few messages.
> > -Harsha
> >
> > On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote:
> > > The brokers also seem unavailable while this is going on.  Each of these
> > > log messages takes 2-3 seconds so at about  1200 partitions it takes up
> > > quite a bit of time. Ultimately it does recover though but sadly it goes
> > > down soon enough after I start sending it messages.
> > >
> > > On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > A 3 node kafka broker cluster went down yesterday (all nodes) and I
> > just
> > > > noticed it this morning. When I restarted it this morning, I see a
> > lengthy
> > > > list of messages like this:
> > > >
> > > > Loading log 'mytopic-partitionNum"
> > > > Recovering unflushed segment 'some number' of in log
> > mytopic-partitionNum.
> > > > Completed load of log mytopic-partitionNum with log end offset
> > someOffset
> > > >
> > > > It's been going on for more than 30 minutes since I restarted the
> > broker.
> > > > I have quite a few partitions (over 1000) but I still wouldn't expect
> > it to
> > > > take such a long time.
> > > >
> > > > Any ideas on how I should investigate the problem?
> > > >
> > > > Thanks!
> > > >
> >

Re: Kafka restart takes a long time

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Actually see a bunch of errors. One of the brokers is out of space and this
might be causing everything to spin out of control.

Some logs:

On *broker 1* (the one that has run out of space):

2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13          ]
[kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13], Disk
error while replicating data.

kafka.common.KafkaStorageException: I/O exception in append to log
'mytopic-633'

        at kafka.log.Log.append(Log.scala:283)

        at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)

        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)

        at kafka.utils.Utils$.inLock(Utils.scala:538)

        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)

        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Caused by: java.io.IOException: No space left on device

        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)

        at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)

        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)

        at sun.nio.ch.IOUtil.write(IOUtil.java:65)

        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)

        at
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)

        at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)

        at kafka.log.LogSegment.append(LogSegment.scala:80)

        at kafka.log.Log.append(Log.scala:269)

        ... 13 more

2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13          ]
[kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-2-13], Error
getting offset for partition [myTopic,0] to broker 13

java.io.IOException: No space left on device

        at java.io.FileOutputStream.writeBytes(Native Method)

        at java.io.FileOutputStream.write(FileOutputStream.java:345)

        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)

        at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)

        at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)

        at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)

        at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)

        at java.io.BufferedWriter.write(BufferedWriter.java:230)

        at java.io.Writer.write(Writer.java:157)

        at
kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50)

        at
kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49)

        at
scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)

        at
scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)

        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106)

        at
kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49)

        at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39)

        at
kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234)

        at
kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231)

        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)

        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)

        at
kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:231)

        at kafka.log.LogManager.truncateTo(LogManager.scala:204)

        at
kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:84)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:144)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)

        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)

        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)

        at kafka.utils.Utils$.inLock(Utils.scala:538)

        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)

        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


*On broker 2:*

2014-11-22T21:20:19.629Z ERROR [request-expiration-task            ]
[kafka.server.KafkaApis              ]: [KafkaApi-12] Error when processing
fetch request for partition [myTopic,265] offset 415659 from follower with
correlation id 0

kafka.common.OffsetOutOfRangeException: Request for offset 415659 but we
only have log segments in the range 0 to 410453.

        at kafka.log.Log.read(Log.scala:377)

        at
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:401)

        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:347)

        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:343)

        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)

        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)

        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)

        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)

        at scala.collection.immutable.HashMap.map(HashMap.scala:35)

        at
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:343)

        at
kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:704)

        at
kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:686)

        at
kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)


*On broker 3*:


2014-11-22T21:26:48.216Z INFO  [kafka-request-handler-3            ]
[fka.controller.PartitionStateMachine]: [Partition state machine on
Controller 13]: Invoking state change to OnlinePartition for partitions
[myTopic,122]

2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
[state.change.logger                 ]: Controller 13 epoch 132 encountered
error while electing leader for partition [myTopic,122] due to: No other
replicas in ISR 13 for [myTopic,122] besides shutting down brokers 13.

2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
[state.change.logger                 ]: Controller 13 epoch 132 initiated
state change for partition [myTopic,122] from OnlinePartition to
OnlinePartition failed

kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [myTopic,122] due to: No other replicas in ISR 13 for
[myTopic,122] besides shutting down brokers 13.

        at
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:344)

        at
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:171)

        at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)

        at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)

        at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)

        at
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)

        at
kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:227)

        at
kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:223)

        at scala.Option.foreach(Option.scala:121)

        at
kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:223)

        at
kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:219)

        at
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)

        at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)

        at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)

        at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)

        at
kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:219)

        at
kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:140)

        at kafka.server.KafkaApis.handle(KafkaApis.scala:77)

        at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)

        at java.lang.Thread.run(Thread.java:744)

Caused by: kafka.common.StateChangeFailedException: No other replicas in
ISR 13 for [myTopic,122] besides shutting down brokers 13

        at
kafka.controller.ControlledShutdownLeaderSelector.selectLeader(PartitionLeaderSelector.scala:181)

        at
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:320)

        ... 19 more

On Sat, Nov 22, 2014 at 1:17 PM, Harsha <ka...@harsha.io> wrote:

> Rajiv ,
>           which version of kafka are you using and do you see any errors
>           when the server goes down after sending few messages.
> -Harsha
>
> On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote:
> > The brokers also seem unavailable while this is going on.  Each of these
> > log messages takes 2-3 seconds so at about  1200 partitions it takes up
> > quite a bit of time. Ultimately it does recover though but sadly it goes
> > down soon enough after I start sending it messages.
> >
> > On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > A 3 node kafka broker cluster went down yesterday (all nodes) and I
> just
> > > noticed it this morning. When I restarted it this morning, I see a
> lengthy
> > > list of messages like this:
> > >
> > > Loading log 'mytopic-partitionNum"
> > > Recovering unflushed segment 'some number' of in log
> mytopic-partitionNum.
> > > Completed load of log mytopic-partitionNum with log end offset
> someOffset
> > >
> > > It's been going on for more than 30 minutes since I restarted the
> broker.
> > > I have quite a few partitions (over 1000) but I still wouldn't expect
> it to
> > > take such a long time.
> > >
> > > Any ideas on how I should investigate the problem?
> > >
> > > Thanks!
> > >
>

Re: Kafka restart takes a long time

Posted by Harsha <ka...@harsha.io>.
Rajiv ,
          which version of kafka are you using and do you see any errors
          when the server goes down after sending few messages.
-Harsha

On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote:
> The brokers also seem unavailable while this is going on.  Each of these
> log messages takes 2-3 seconds so at about  1200 partitions it takes up
> quite a bit of time. Ultimately it does recover though but sadly it goes
> down soon enough after I start sending it messages.
> 
> On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
> 
> > A 3 node kafka broker cluster went down yesterday (all nodes) and I just
> > noticed it this morning. When I restarted it this morning, I see a lengthy
> > list of messages like this:
> >
> > Loading log 'mytopic-partitionNum"
> > Recovering unflushed segment 'some number' of in log mytopic-partitionNum.
> > Completed load of log mytopic-partitionNum with log end offset someOffset
> >
> > It's been going on for more than 30 minutes since I restarted the broker.
> > I have quite a few partitions (over 1000) but I still wouldn't expect it to
> > take such a long time.
> >
> > Any ideas on how I should investigate the problem?
> >
> > Thanks!
> >

Re: Kafka restart takes a long time

Posted by Rajiv Kurian <ra...@signalfuse.com>.
The brokers also seem unavailable while this is going on.  Each of these
log messages takes 2-3 seconds so at about  1200 partitions it takes up
quite a bit of time. Ultimately it does recover though but sadly it goes
down soon enough after I start sending it messages.

On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> A 3 node kafka broker cluster went down yesterday (all nodes) and I just
> noticed it this morning. When I restarted it this morning, I see a lengthy
> list of messages like this:
>
> Loading log 'mytopic-partitionNum"
> Recovering unflushed segment 'some number' of in log mytopic-partitionNum.
> Completed load of log mytopic-partitionNum with log end offset someOffset
>
> It's been going on for more than 30 minutes since I restarted the broker.
> I have quite a few partitions (over 1000) but I still wouldn't expect it to
> take such a long time.
>
> Any ideas on how I should investigate the problem?
>
> Thanks!
>