You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/08 16:47:00 UTC

[jira] [Commented] (KAFKA-6517) ZooKeeperClient holds a lock while waiting for responses, blocking shutdown

    [ https://issues.apache.org/jira/browse/KAFKA-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357199#comment-16357199 ] 

ASF GitHub Bot commented on KAFKA-6517:
---------------------------------------

rajinisivaram opened a new pull request #4551: KAFKA-6517: Avoid deadlock in ZooKeeperClient during session expiry
URL: https://github.com/apache/kafka/pull/4551
 
 
   `ZooKeeperClient` acquires `initializationLock#writeLock` to establish a new connection while processing session expiry WatchEvent. `ZooKeeperClient#handleRequests` acquires  `initializationLock#readLock`, allowing multiple batches of requests to be processed concurrently, but preventing reconnections while processing requests. At the moment, `handleRequests` holds onto the readLock throughout the method, even while waiting for responses and inflight requests to complete. But responses cannot be delivered if event thread is blocked on the writeLock to process session expiry event. This results in a deadlock. During broker shutdown, the shutdown thread is also blocked since it needs the readLock to perform `ZooKeeperClient#unregisterStateChangeHandler`, which cannot be acquired if a session expiry had occurred earlier since this thread gets queued behind the event handler thread waiting for writeLock.
   
   This PR fixes the issue by limiting locking in `ZooKeeperClient#handleRequests` to just the non-blocking send, so that session expiry handling doesn't get blocked.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ZooKeeperClient holds a lock while waiting for responses, blocking shutdown
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6517
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6517
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> Stack traces from a local test run that was deadlocked because shutdown couldn't acquire the lock:
>  # kafka-scheduler-7: acquired read lock in kafka.zookeeper.ZooKeeperClient.handleRequests
>  # Test worker-EventThread waiting for write lock to process SessionExpired in kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process
>  # ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind 2) waiting to acquire read lock for kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler
> Stack traces of the relevant threads:
> {quote}
> "kafka-scheduler-7" daemon prio=5 tid=0x00007fade918d800 nid=0xd317 waiting on condition [0x000070000b371000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007e4c6e698> (a java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
>         at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146)
>         at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125)
>         at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432)
>         at kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425)
>         at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583)
>         at kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
>         at kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665)
>         at kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499)
>         at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
>         at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
> ......
> "Test worker-EventThread" daemon prio=5 tid=0x00007fade90cf800 nid=0xef13 waiting on condition [0x000070000a23f000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
>         at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355)
>         at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>         at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
>  
> "ForkJoinPool-1-worker-11" daemon prio=5 tid=0x00007fade9a83000 nid=0x17907 waiting on condition [0x0000700011eaf000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
>         at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler(ZooKeeperClient.scala:295)
>         at kafka.zk.KafkaZkClient.unregisterStateChangeHandler(KafkaZkClient.scala:1217)
>         at kafka.common.ZkNodeChangeNotificationListener.close(ZkNodeChangeNotificationListener.scala:68)
>         at kafka.server.DynamicConfigManager.shutdown(DynamicConfigManager.scala:181)
>         at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:552)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:552)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)