You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2016/05/17 13:52:13 UTC

[jira] [Updated] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

     [ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ismael Juma updated KAFKA-1305:
-------------------------------
    Fix Version/s:     (was: 0.10.1.0)

> Controller can hang on controlled shutdown with auto leader balance enabled
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-1305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1305
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>            Assignee: Sriharsha Chintalapani
>            Priority: Blocker
>             Fix For: 0.8.2.0
>
>         Attachments: KAFKA-1305.patch, KAFKA-1305.patch, KAFKA-1305_2014-10-13_07:30:45.patch
>
>
> This is relatively easy to reproduce especially when doing a rolling bounce.
> What happened here is as follows:
> 1. The previous controller was bounced and broker 265 became the new controller.
> 2. I went on to do a controlled shutdown of broker 265 (the new controller).
> 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  (t@113 below).
> 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server.
> 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to.  However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.)
> 6. So the request thread to broker 265 gets into infinite retries.
> 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock).
> Relevant portions from the thread-dump:
> "Controller-265-to-broker-265-send-thread" - Thread t@113
>    java.lang.Thread.State: TIMED_WAITING
> 	at java.lang.Thread.sleep(Native Method)
> 	at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
> 	at kafka.utils.Utils$.swallow(Utils.scala:167)
> 	at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> 	at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
> 	at kafka.utils.Logging$class.swallow(Logging.scala:94)
> 	at kafka.utils.Utils$.swallow(Utils.scala:46)
> 	at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
> 	at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> 	- locked java.lang.Object@6dbf14a7
> 	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>    Locked ownable synchronizers:
> 	- None
> ...
> "Thread-4" - Thread t@17
>    java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0
> 	at sun.misc.Unsafe.park(Native Method)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> 	at kafka.utils.Utils$.inLock(Utils.scala:536)
> 	at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
> 	at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
> 	at kafka.utils.Utils$.swallow(Utils.scala:167)
> 	at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> 	at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
> 	at kafka.utils.Logging$class.swallow(Logging.scala:94)
> 	at kafka.utils.Utils$.swallow(Utils.scala:46)
> 	at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
> 	at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
> 	at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> ...
> "kafka-scheduler-0" - Thread t@117
>    java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
> 	at sun.misc.Unsafe.park(Native Method)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> 	at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> 	at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> 	- locked java.lang.Object@578b748f
> 	at kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
> 	at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> 	at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 	at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
> 	at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
> 	at kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
> 	at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
> 	at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
> 	at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
> 	at kafka.utils.Utils$.inLock(Utils.scala:538)
> 	at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
> 	at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 	at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
> 	at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
> 	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> 	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> 	at kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
> 	at kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
> 	at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> 	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
>    Locked ownable synchronizers:
> 	- locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840
> 	- locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530
> ...
> "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
>    java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0
> 	at sun.misc.Unsafe.park(Native Method)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> 	at kafka.utils.Utils$.inLock(Utils.scala:536)
> 	at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
> 	at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> 	at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)