You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Michael Noll (JIRA)" <ji...@apache.org> on 2014/04/29 15:01:17 UTC

[jira] [Comment Edited] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop

    [ https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13984265#comment-13984265 ] 

Michael Noll edited comment on KAFKA-1029 at 4/29/14 12:59 PM:
---------------------------------------------------------------

Interestingly I ran into the very same issue while doing basic validation of Kafka 0.8.1.1.  Note that unlike Jason Rosenberg's case I was only using code/tools/scripts that are shipped with Kafka (e.g. console producer and console consumer).

Here is an example log message, which was repeated indefinitely:

{code}
[2014-04-29 09:48:27,207] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398764901156"} stored data: {"version":1,"brokerid":0,"timestamp":"1398764894941"} (kafka.utils.ZkUtils$)
[2014-04-29 09:48:27,218] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398764901156"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

*How to reproduce (unsolved)*

Unfortunately I cannot consistently reproduce the issue and, to be honest, I am still not sure what actually triggers the bug.  As such I can only summarize what I did before and around the time when this bug was triggered, which I could observe through errors in log files and through errors being displayed after running certain commands.  So yes, it's a bit like shooting in the dark.

Here's an overview of my test setup:

- I deployed Kafka 0.8.1.1 to one machine {{kafka1}} and ZooKeeper 3.4.5 to a second machine {{zookeeper1}}.
- I followed the Kafka 0.8.1 quick start guide to create a topic "test" with 1 partition and a replication factor of 1.
- I sent test messages to the topic "test" via the console producer.
- I read test messages from the topic "test" via the console consumer.
- Apart from producing and consuming a handful of test messages I also ran some supposedly read-only admin commands such as "describing" the topic, and running the consumer offset checker tool.
- At "some" point, Kafka was caught in an indefinite loop complaining about "conflicted ephemeral node".


The following paragraphs list in more detail what I did before the error popped up.

Producer:

{code}
$ sudo su - kafka
$ cd /opt/kafka

# This command returned no topics at this point = worked as expected
$ bin/kafka-topics.sh --list --zookeeper zookeeper1

# I created a topic, worked as expected
$ bin/kafka-topics.sh --create --zookeeper zookeeper1 --replication-factor 1 --partitions 1 --topic test

# I requested details of topic "test", worked as expected
$ bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic test

# I started a console producer and manually send a handful of test messages, worked as expected (see consumer below)
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{code}

Consumer:

{code}
$ sudo su - kafka
$ cd /opt/kafka

# I started a console consumer, worked as expected (i.e. could read messages sent by producer, see above)
$ bin/kafka-console-consumer.sh --zookeeper zookeeper1 --topic test --from-beginning
{code}

Up to that point, everything worked.  But then the Kafka broker went the way of the dodo.  As I said I can't pinpoint the cause, and re-running the same commands on a fresh Kafka/ZooKeeper deployment (fresh VMs etc.) didn't consistently trigger the issue like I hoped.

Here's what I did after the commands above, and at some point I eventually did observe the original error described in this JIRA ticket.  Again, at the moment I cannot tell what actually triggered the bug.

Producer:

{code}
# Test-driving the consumer offset checker
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect zookeeper1
# At this point consumer "foo" was not expected to exist.
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect zookeeper1 --broker-info --group foo

#
# I then re-started the console producer (see below), now configured to use the group id "foo".
#
{code}

Consumer:

{code}
# I re-started the console consumer, now configured to use the group id "foo".
$ bin/kafka-console-consumer.sh --zookeeper zookeeper1 --topic test --from-beginning --group foo
{code}

At this point "describing" the topic gave the following info, indicating that there was a problem (e.g. no leader for partition):

{code}
$ bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic testTopic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: -1	Replicas: 0	Isr:
{code}

Log files such as {{state-change.log}} showed these error messages:

{code}
[2014-04-29 07:44:47,816] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEp
och:1),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response to UpdateMetadata request sent by controll
er 0 epoch 1 with correlation id 7 (state.change.logger)
[2014-04-29 07:44:47,818] TRACE Controller 0 epoch 1 received response correlationId 7 for a request sent to broker id:0
,host:kafka1,port:9092 (state.change.logger)
[2014-04-29 08:48:06,559] TRACE Controller 0 epoch 1 changed partition [test,0] state from OnlinePartition to OfflinePar
tition (state.change.logger)
[2014-04-29 08:48:06,561] TRACE Controller 0 epoch 1 started leader election for partition [test,0] (state.change.logger
)
[2014-04-29 08:48:06,574] ERROR Controller 0 epoch 1 initiated state change for partition [test,0] from OfflinePartition
 to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned r
eplicas are: [List(0)]
        at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
        at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
        at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionSta
teMachine.scala:185)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachi
ne.scala:99)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachi
ne.scala:96)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
        at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433)
        at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344)
        at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
        at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$[2014-04-29 09:33:16,161] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398761286651"} stored data: {"version":1,"brokerid":0,"timestamp":"1398761286608"} (kafka.utils.ZkUtils$)
[2014-04-29 09:33:16,163] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398761286651"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

I also tried a few admin commands to see what would happen:

{code}
$ bin/kafka-preferred-replica-election.sh --zookeeper zookeeper1 --path-to-json-file /tmp/test.json
Failed to start preferred replica election
kafka.common.AdminCommandFailedException: Admin command failed
	at kafka.admin.PreferredReplicaLeaderElectionCommand.moveLeaderToPreferredReplica(PreferredReplicaLeaderElectionCommand.scala:115)
	at kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:60)
	at kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)
Caused by: kafka.admin.AdminOperationException: Preferred replica leader election currently in progress for Set(). Aborting operation
	at kafka.admin.PreferredReplicaLeaderElectionCommand$.writePreferredReplicaElectionData(PreferredReplicaLeaderElectionCommand.scala:101)
	at kafka.admin.PreferredReplicaLeaderElectionCommand.moveLeaderToPreferredReplica(PreferredReplicaLeaderElectionCommand.scala:113)
	... 2 more
{code}



was (Author: miguno):
Interestingly I ran into the very same issue while doing basic validation of Kafka 0.8.1.1.

Here is an example log message, which was repeated indefinitely:

{code}
[2014-04-29 09:48:27,207] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398764901156"} stored data: {"version":1,"brokerid":0,"timestamp":"1398764894941"} (kafka.utils.ZkUtils$)
[2014-04-29 09:48:27,218] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398764901156"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

*How to reproduce (unsolved)*

Unfortunately I cannot consistently reproduce the issue and, to be honest, I am still not sure what actually triggers the bug.  As such I can only summarize what I did before and around the time when this bug was triggered, which I could observe through errors in log files and through errors being displayed after running certain commands.  So yes, it's a bit like shooting in the dark.

Here's an overview of my test setup:

- I deployed Kafka 0.8.1.1 to one machine {{kafka1}} and ZooKeeper 3.4.5 to a second machine {{zookeeper1}}.
- I followed the Kafka 0.8.1 quick start guide to create a topic "test" with 1 partition and a replication factor of 1.
- I sent test messages to the topic "test" via the console producer.
- I read test messages from the topic "test" via the console consumer.
- Apart from producing and consuming a handful of test messages I also ran some supposedly read-only admin commands such as "describing" the topic, and running the consumer offset checker tool.
- At "some" point, Kafka was caught in an indefinite loop complaining about "conflicted ephemeral node".


The following paragraphs list in more detail what I did before the error popped up.

Producer:

{code}
$ sudo su - kafka
$ cd /opt/kafka

# This command returned no topics at this point = worked as expected
$ bin/kafka-topics.sh --list --zookeeper zookeeper1

# I created a topic, worked as expected
$ bin/kafka-topics.sh --create --zookeeper zookeeper1 --replication-factor 1 --partitions 1 --topic test

# I requested details of topic "test", worked as expected
$ bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic test

# I started a console producer and manually send a handful of test messages, worked as expected (see consumer below)
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{code}

Consumer:

{code}
$ sudo su - kafka
$ cd /opt/kafka

# I started a console consumer, worked as expected (i.e. could read messages sent by producer, see above)
$ bin/kafka-console-consumer.sh --zookeeper zookeeper1 --topic test --from-beginning
{code}

Up to that point, everything worked.  But then the Kafka broker went the way of the dodo.  As I said I can't pinpoint the cause, and re-running the same commands on a fresh Kafka/ZooKeeper deployment (fresh VMs etc.) didn't consistently trigger the issue like I hoped.

Here's what I did after the commands above, and at some point I eventually did observe the original error described in this JIRA ticket.  Again, at the moment I cannot tell what actually triggered the bug.

Producer:

{code}
# Test-driving the consumer offset checker
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect zookeeper1
# At this point consumer "foo" was not expected to exist.
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect zookeeper1 --broker-info --group foo

#
# I then re-started the console producer (see below), now configured to use the group id "foo".
#
{code}

Consumer:

{code}
# I re-started the console consumer, now configured to use the group id "foo".
$ bin/kafka-console-consumer.sh --zookeeper zookeeper1 --topic test --from-beginning --group foo
{code}

At this point "describing" the topic gave the following info, indicating that there was a problem (e.g. no leader for partition):

{code}
$ bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic testTopic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: -1	Replicas: 0	Isr:
{code}

Log files such as {{state-change.log}} showed these error messages:

{code}
[2014-04-29 07:44:47,816] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEp
och:1),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response to UpdateMetadata request sent by controll
er 0 epoch 1 with correlation id 7 (state.change.logger)
[2014-04-29 07:44:47,818] TRACE Controller 0 epoch 1 received response correlationId 7 for a request sent to broker id:0
,host:kafka1,port:9092 (state.change.logger)
[2014-04-29 08:48:06,559] TRACE Controller 0 epoch 1 changed partition [test,0] state from OnlinePartition to OfflinePar
tition (state.change.logger)
[2014-04-29 08:48:06,561] TRACE Controller 0 epoch 1 started leader election for partition [test,0] (state.change.logger
)
[2014-04-29 08:48:06,574] ERROR Controller 0 epoch 1 initiated state change for partition [test,0] from OfflinePartition
 to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned r
eplicas are: [List(0)]
        at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
        at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
        at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionSta
teMachine.scala:185)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachi
ne.scala:99)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachi
ne.scala:96)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
        at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433)
        at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344)
        at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
        at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$[2014-04-29 09:33:16,161] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398761286651"} stored data: {"version":1,"brokerid":0,"timestamp":"1398761286608"} (kafka.utils.ZkUtils$)
[2014-04-29 09:33:16,163] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398761286651"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

I also tried a few admin commands to see what would happen:

{code}
$ bin/kafka-preferred-replica-election.sh --zookeeper zookeeper1 --path-to-json-file /tmp/test.json
Failed to start preferred replica election
kafka.common.AdminCommandFailedException: Admin command failed
	at kafka.admin.PreferredReplicaLeaderElectionCommand.moveLeaderToPreferredReplica(PreferredReplicaLeaderElectionCommand.scala:115)
	at kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:60)
	at kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)
Caused by: kafka.admin.AdminOperationException: Preferred replica leader election currently in progress for Set(). Aborting operation
	at kafka.admin.PreferredReplicaLeaderElectionCommand$.writePreferredReplicaElectionData(PreferredReplicaLeaderElectionCommand.scala:101)
	at kafka.admin.PreferredReplicaLeaderElectionCommand.moveLeaderToPreferredReplica(PreferredReplicaLeaderElectionCommand.scala:113)
	... 2 more
{code}


> Zookeeper leader election stuck in ephemeral node retry loop
> ------------------------------------------------------------
>
>                 Key: KAFKA-1029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1029
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.8.0
>            Reporter: Sam Meder
>            Assignee: Sam Meder
>            Priority: Blocker
>             Fix For: 0.8.0
>
>         Attachments: 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
>     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
>     val timestamp = SystemTime.milliseconds.toString
>     val electString = ...
>     try {
>       createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId,
>         (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it looks like a new registration will be added every elect call - shouldn't it register in startup()?) so can update leaderId to the current leader before the call to create. If that happens then we will continuously get node exists exceptions and the checker function will always return true, i.e. we will never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
>         (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the broker that owned the node previously, although I am still not 100% sure if that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)