You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Onur Karaman (JIRA)" <ji...@apache.org> on 2015/06/06 01:28:00 UTC
[jira] [Updated] (KAFKA-2253) Deadlock in delayed operation
purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Onur Karaman updated KAFKA-2253:
--------------------------------
Description:
We hit a deadlock while running brokers with git hash: 9e894aa0173b14d64a900bcf780d6b7809368384
{code}
Found one Java-level deadlock:
=============================
"kafka-request-handler-a":
waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
which is held by "ExpirationReaper-xyz"
"ExpirationReaper-xyz":
waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a java.util.LinkedList),
which is held by "kafka-request-handler-b"
"kafka-request-handler-b":
waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
which is held by "ExpirationReaper-xyz"
"kafka-request-handler-a":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
"ExpirationReaper-xyz":
at kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
- waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList)
at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
at kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
- locked <0x000000071a86a478> (a java.util.LinkedList)
at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
"kafka-request-handler-b":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303)
- locked <0x00000006b0563fe8> (a java.util.LinkedList)
at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228)
at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426)
at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272)
at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock.
{code}
was:With purgatory usage in consumer coordinator, it will be common that watcher lists are very short and live only for a short time. So we'd better clean them from the watchersForKey Pool once the list become empty in checkAndComplete() calls.
> Deadlock in delayed operation purgatory
> ---------------------------------------
>
> Key: KAFKA-2253
> URL: https://issues.apache.org/jira/browse/KAFKA-2253
> Project: Kafka
> Issue Type: Bug
> Reporter: Mayuresh Gharat
> Assignee: Guozhang Wang
>
> We hit a deadlock while running brokers with git hash: 9e894aa0173b14d64a900bcf780d6b7809368384
> {code}
> Found one Java-level deadlock:
> =============================
> "kafka-request-handler-a":
> waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
> which is held by "ExpirationReaper-xyz"
> "ExpirationReaper-xyz":
> waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a java.util.LinkedList),
> which is held by "kafka-request-handler-b"
> "kafka-request-handler-b":
> waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
> which is held by "ExpirationReaper-xyz"
> "kafka-request-handler-a":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
> at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
> at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
> at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
> at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
> at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
> at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
> at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
> at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
> at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
> at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
> at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> "ExpirationReaper-xyz":
> at kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
> - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList)
> at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
> at kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
> - locked <0x000000071a86a478> (a java.util.LinkedList)
> at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
> at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> "kafka-request-handler-b":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
> at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303)
> - locked <0x00000006b0563fe8> (a java.util.LinkedList)
> at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228)
> at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
> at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426)
> at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410)
> at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
> at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
> at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
> at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Found 1 deadlock.
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)