You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Yiyang Li (JIRA)" <ji...@apache.org> on 2014/08/21 17:51:12 UTC

[jira] [Commented] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop

    [ https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105480#comment-14105480 ] 

Yiyang Li commented on KAFKA-1029:
----------------------------------

Hello, I am totally new to Kafka, and we are .Net developers who is testing the kafka by the kafka-net library. The Nuget package is still under alpha version, leaving lots of functionality not implemented. 

We recently got this problem when we embed a producer in a service, where according to the library, it will do a ResponseTimeoutCheck every 30000 ms. However, one of the client throws an error (the others are fine)

ERROR Closing socket for /10.207.x.x because of error (kafka.network.Processor)
java.io.IOException: An existing connection was forcibly closed by the remote host
        at sun.nio.ch.SocketDispatcher.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:217)
        at kafka.network.Processor.write(SocketServer.scala:375)
        at kafka.network.Processor.run(SocketServer.scala:247)
        at java.lang.Thread.run(Thread.java:745)
log4j:ERROR Failed to rename [/cygdrive/c/kafka/bin/../logs/server.log] to [/cygdrive/c/kafka/bin/../logs/server.log.2014-08-20-17].

under the kafka-server-start.sh

I have change the log4j properties to 

log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log

Other backgound:
Kafka binaries: Scala 2.9.2 - kafka_2.9.2-0.8.1.1
Brokers: 3 brokers in 3 different machines, using the same port under each IP  (borker id = 0, 1, 2)
Zookeeper: 2181 port at one of the brokers

I understand that it might be hard to reproduce as it might be the problem in incomplete .Net client

Details of ResponseTimeoutCheck:

https://github.com/YiyangLi/kafka-net/blob/master/src/kafka-net/KafkaConnection.cs (Line 200)

Thanks. 


> Zookeeper leader election stuck in ephemeral node retry loop
> ------------------------------------------------------------
>
>                 Key: KAFKA-1029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1029
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.8.0
>            Reporter: Sam Meder
>            Assignee: Sam Meder
>            Priority: Blocker
>             Fix For: 0.8.0
>
>         Attachments: 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
>     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
>     val timestamp = SystemTime.milliseconds.toString
>     val electString = ...
>     try {
>       createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId,
>         (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it looks like a new registration will be added every elect call - shouldn't it register in startup()?) so can update leaderId to the current leader before the call to create. If that happens then we will continuously get node exists exceptions and the checker function will always return true, i.e. we will never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
>         (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the broker that owned the node previously, although I am still not 100% sure if that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)