You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rajini Sivaram (Jira)" <ji...@apache.org> on 2020/04/16 16:15:00 UTC

[jira] [Resolved] (KAFKA-9796) Broker shutdown could be stuck forever under certain conditions

     [ https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rajini Sivaram resolved KAFKA-9796.
-----------------------------------
    Fix Version/s: 2.6.0
         Reviewer: Rajini Sivaram
       Resolution: Fixed

> Broker shutdown could be stuck forever under certain conditions
> ---------------------------------------------------------------
>
>                 Key: KAFKA-9796
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9796
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: David Jacot
>            Assignee: David Jacot
>            Priority: Major
>             Fix For: 2.6.0
>
>
> During the broker initialisation, the Acceptor threads are started early to know the bound port and delays starting the processors to the end of the initialisation sequence. We have found out that the shutdown of a broker could be stuck forever under the following conditions:
>  - the shutdown procedure is started before the processors are started;
>  - the `newConnections` queues of the processors are full; and
>  - an extra new connection has been accepted but can't be queued up in a processor.
> For instance, this could happen if a `NodeExistsException` is raised when the broker tries to register itself in ZK.
> When the above conditions happens, the shutting down triggers the shutdown of the acceptor threads and waits until they are (first thread dump bellow). If an acceptor as a pending connection which can't be queued up in a processor, it ends up waiting until space is made is new queue to accept the new connection (second thread dump bellow). As the processors are not started, the new connection queues are not drained so it never releases the acceptor thread.
> *Shutdown wait on acceptor to shutdown*
> {noformat}
> "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s tid=0x00007f625001c800 nid=0x272 waiting on condition  [0x00007f6257ca4000]
>    java.lang.Thread.State: WAITING (parking)
> 	at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
> 	- parking to wait for  <0x0000000689a61800> (a java.util.concurrent.CountDownLatch$Sync)
> 	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345)
> 	at java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232)
> 	at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
> 	at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
> 	at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
> 	at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
> 	at kafka.network.SocketServer$$Lambda$604/0x0000000840540840.apply(Unknown Source)
> 	at scala.collection.Iterator.foreach(Iterator.scala:941)
> 	at scala.collection.Iterator.foreach$(Iterator.scala:941)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> 	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
> 	at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
> 	- locked <0x0000000689a61ac0> (a kafka.network.SocketServer)
> 	at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806)
> 	at kafka.server.KafkaServer$$Lambda$602/0x000000084052b040.apply$mcV$sp(Unknown Source)
> 	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
> 	at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806)
> 	at kafka.server.KafkaServer.startup(KafkaServer.scala:522)
> 	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> 	at kafka.Kafka$.main(Kafka.scala:82)
> 	at kafka.Kafka.main(Kafka.scala)
> {noformat}
> *Acceptor waits on processor to accept the new connection*
> {noformat}
> "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x00007f62523b5000 nid=0x2ca waiting on condition  [0x00007f6157130000]
>    java.lang.Thread.State: WAITING (parking)
> 	at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
> 	- parking to wait for  <0x0000000689a7cad8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.5/AbstractQueuedSynchronizer.java:2081)
> 	at java.util.concurrent.ArrayBlockingQueue.put(java.base@11.0.5/ArrayBlockingQueue.java:367)
> 	at kafka.network.Processor.accept(SocketServer.scala:1020)
> 	at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:639)
> 	at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:566)
> 	at kafka.network.Acceptor.run(SocketServer.scala:550)
> 	at java.lang.Thread.run(java.base@11.0.5/Thread.java:834)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)