You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Henri Pihkala <he...@streamr.com> on 2015/01/06 23:17:02 UTC

Topic in bad state after controller to broker messaging fails

Hi,

I’m hitting a strange problem using 0.8.2-beta and just a single kafka broker on CentOS 6.5. 

A percentage of my topic create attempts are randomly failing and leaving the new topic in a state in which it can not be used due to “partition doesn’t exist” errors as seen in server.log below.

In controller.log, it looks like the controller fails to send either the "become-leader LeaderAndIsr request” or the "UpdateMetadata request” to the broker (which in fact is the same Kafka instance), due to a socket read failing (for unknown reason).

My questions:

(1) Is the bad topic state a result of the message not making it from the controller to the broker?

(2) Any idea why the socket read might randomly fail? It can’t be a network issue since we’re running a single instance.

(3) Shouldn’t the controller try to resend the message?



controller.log

[2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread)
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
        at kafka.utils.Utils$.read(Utils.scala:381)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0] -> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
        at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)




state-change.log

[2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to NewPartition with assigned replicas 0 (state.change.logger)

[2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NonExistentReplica to NewReplica (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to OnlinePartition with leader 0 (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 16 to broker 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 16 to broker 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewReplica to OnlineReplica (state.change.logger)




server.log

[2015-01-06 22:01:48,137] WARN [KafkaApi-0] Offset request with correlation id 0 from client console-consumer-34042-ConsumerFetcherThread-console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-0-0 on partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] failed due to Partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] doesn't exist on 0 (kafka.server.KafkaApis)

[2015-01-06 22:01:48,140] INFO Closing socket connection to /192.168.10.21. (kafka.network.Processor)

... etc etc


describe topic:

$ bin/kafka-topics.sh --zookeeper dev.unifina:2181 --describe --topic 09b1ebac-7036-49fc-aa61-7852808ca241
Topic:09b1ebac-7036-49fc-aa61-7852808ca241      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: 09b1ebac-7036-49fc-aa61-7852808ca241     Partition: 0    Leader: 0       Replicas: 0     Isr: 0


attempt to consume from the topic using the console consumer:

[2015-01-06 22:01:47,928] WARN [console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-leader-finder-thread], Failed to add leader for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
        at java.lang.Class.newInstance0(Class.java:372)
        at java.lang.Class.newInstance(Class.java:325)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:168)
        at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
        at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:180)
        at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:175)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:175)
        at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
        at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


Thanks for your help!

Best regards
Henri


Re: Topic in bad state after controller to broker messaging fails

Posted by Jun Rao <ju...@confluent.io>.
Yes, typically, a bad topic state is the result of communication issues
between the controller and the broker. You may be hitting KAFKA-1738. Could
you try 0.8.2.0 RC1 or the latest 0.8.2 branch? The issue has been fixed
there.

Thanks,

Jun

On Tue, Jan 6, 2015 at 2:17 PM, Henri Pihkala <he...@streamr.com>
wrote:

> Hi,
>
> I’m hitting a strange problem using 0.8.2-beta and just a single kafka
> broker on CentOS 6.5.
>
> A percentage of my topic create attempts are randomly failing and leaving
> the new topic in a state in which it can not be used due to “partition
> doesn’t exist” errors as seen in server.log below.
>
> In controller.log, it looks like the controller fails to send either the
> "become-leader LeaderAndIsr request” or the "UpdateMetadata request” to the
> broker (which in fact is the same Kafka instance), due to a socket read
> failing (for unknown reason).
>
> My questions:
>
> (1) Is the bad topic state a result of the message not making it from the
> controller to the broker?
>
> (2) Any idea why the socket read might randomly fail? It can’t be a
> network issue since we’re running a single instance.
>
> (3) Shouldn’t the controller try to resend the message?
>
>
>
> controller.log
>
> [2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback
> for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.KafkaController)
>
> [2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation
> callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.KafkaController)
>
> [2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]:
> Invoking state change to NewPartition for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]:
> Invoking state change to NewReplica for replicas
> [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
> (kafka.controller.ReplicaStateMachine)
>
> [2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]:
> Invoking state change to OnlinePartition for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]:
> Live assigned replicas for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]:
> Initializing leader and isr for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] to
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2)
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]:
> Invoking state change to OnlineReplica for replicas
> [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
> (kafka.controller.ReplicaStateMachine)
>
> [2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread],
> Controller 0 fails to send a request to broker
> id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>         at kafka.utils.Utils$.read(Utils.scala:381)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread],
> Controller 0 epoch 2 failed to send request
> Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0]
> ->
> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
> to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
>
>
> state-change.log
>
> [2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to
> NewPartition with assigned replicas 0 (state.change.logger)
>
> [2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of
> replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from
> NonExistentReplica to NewReplica (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to
> OnlinePartition with leader 0 (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending become-leader
> LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with
> correlationId 16 to broker 0 for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending
> UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2)
> with correlationId 16 to broker 0 for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed state of
> replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from
> NewReplica to OnlineReplica (state.change.logger)
>
>
>
>
> server.log
>
> [2015-01-06 22:01:48,137] WARN [KafkaApi-0] Offset request with
> correlation id 0 from client
> console-consumer-34042-ConsumerFetcherThread-console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-0-0
> on partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] failed due to
> Partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] doesn't exist on 0
> (kafka.server.KafkaApis)
>
> [2015-01-06 22:01:48,140] INFO Closing socket connection to /192.168.10.21.
> (kafka.network.Processor)
>
> ... etc etc
>
>
> describe topic:
>
> $ bin/kafka-topics.sh --zookeeper dev.unifina:2181 --describe --topic
> 09b1ebac-7036-49fc-aa61-7852808ca241
> Topic:09b1ebac-7036-49fc-aa61-7852808ca241      PartitionCount:1
> ReplicationFactor:1     Configs:
>         Topic: 09b1ebac-7036-49fc-aa61-7852808ca241     Partition: 0
> Leader: 0       Replicas: 0     Isr: 0
>
>
> attempt to consume from the topic using the console consumer:
>
> [2015-01-06 22:01:47,928] WARN
> [console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-leader-finder-thread],
> Failed to add leader for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]; will retry
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.UnknownTopicOrPartitionException
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>         at java.lang.Class.newInstance0(Class.java:372)
>         at java.lang.Class.newInstance(Class.java:325)
>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>         at
> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:168)
>         at
> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
>         at
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:180)
>         at
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:175)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:175)
>         at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
>         at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
>         at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
> Thanks for your help!
>
> Best regards
> Henri
>
>