You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (Jira)" <ji...@apache.org> on 2021/09/20 16:47:00 UTC
[jira] [Resolved] (KAFKA-13254) Deadlock when expanding ISR
[ https://issues.apache.org/jira/browse/KAFKA-13254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-13254.
-------------------------------------
Resolution: Fixed
> Deadlock when expanding ISR
> ---------------------------
>
> Key: KAFKA-13254
> URL: https://issues.apache.org/jira/browse/KAFKA-13254
> Project: Kafka
> Issue Type: Bug
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
> Priority: Major
>
> Found this when debugging downgrade system test failures. The patch for https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here are the jstack details:
> {code}
> "data-plane-kafka-request-handler-4":
> waiting for ownable synchronizer 0x00000000fcc00020, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
> which is held by "data-plane-kafka-request-handler-5"
> "data-plane-kafka-request-handler-5":
> waiting for ownable synchronizer 0x00000000c9161b20, (a
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
> which is held by "data-plane-kafka-request-handler-4"
> "data-plane-kafka-request-handler-4":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000fcc00020> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
> at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
> at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
> at kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59)
> at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907)
> at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421)
> at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340)
> at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340)
> at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown Source)
> at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74)
> at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345)
> at kafka.cluster.Partition.expandIsr(Partition.scala:1312)
> at kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754)
> at kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672)
> at kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806)
> at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown Source)
> at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
> at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
> at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42)
> at kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790)
> at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:173)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-request-handler-5":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000c9161b20> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
> at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
> at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:1183)
> at kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:96)
> at kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:89)
> at kafka.server.DelayedFetch$$Lambda$1115/1987378797.apply(Unknown Source)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:89)
> at kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
> at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
> at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
> at kafka.server.ReplicaManager.$anonfun$appendRecords$6(ReplicaManager.scala:622)
> at kafka.server.ReplicaManager$$Lambda$1150/40125541.apply(Unknown Source)
> at scala.collection.mutable.HashMap$Node.foreach(HashMap.scala:627)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:489)
> at kafka.server.ReplicaManager.$anonfun$appendRecords$5(ReplicaManager.scala:611)
> at kafka.server.ReplicaManager$$Lambda$1134/1761219075.apply$mcV$sp(Unknown Source)
> at kafka.server.ActionQueue.tryCompleteActions(ActionQueue.scala:49)
> at kafka.server.ReplicaManager.tryCompleteActions(ReplicaManager.scala:569)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:245)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Basically one thread holds the LeaderAndIsr write lock (from maybeExpandIsr) and is trying to grab the lock for a delayed operation in order to complete it. The other thread has the lock for the delayed operation and is trying to grab the LeaderAndIsr read lock.
> Note that this does not affect 3.0 or any other released version.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)