You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Onur Karaman (JIRA)" <ji...@apache.org> on 2015/06/06 01:28:00 UTC

[jira] [Updated] (KAFKA-2253) Deadlock in delayed operation purgatory

     [ https://issues.apache.org/jira/browse/KAFKA-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Onur Karaman updated KAFKA-2253:
--------------------------------
    Description: 
We hit a deadlock while running brokers with git hash: 9e894aa0173b14d64a900bcf780d6b7809368384
{code}
Found one Java-level deadlock:
=============================
"kafka-request-handler-a":
  waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "ExpirationReaper-xyz"
"ExpirationReaper-xyz":
  waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a java.util.LinkedList),
  which is held by "kafka-request-handler-b"
"kafka-request-handler-b":
  waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "ExpirationReaper-xyz"

"kafka-request-handler-a":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
        at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
        at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
        at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
        at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
        at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
        at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
        at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
        at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
        at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:745)

"ExpirationReaper-xyz":
        at kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
        - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList)
        at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
        at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
        at kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
        - locked <0x000000071a86a478> (a java.util.LinkedList)
        at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
        at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"kafka-request-handler-b":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
        at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
        at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303)
        - locked <0x00000006b0563fe8> (a java.util.LinkedList)
        at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228)
        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
        at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426)
        at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410)
        at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
        at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
        at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.
{code}

  was:With purgatory usage in consumer coordinator, it will be common that watcher lists are very short and live only for a short time. So we'd better clean them from the watchersForKey Pool once the list become empty in checkAndComplete() calls. 


> Deadlock in delayed operation purgatory
> ---------------------------------------
>
>                 Key: KAFKA-2253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2253
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Mayuresh Gharat
>            Assignee: Guozhang Wang
>
> We hit a deadlock while running brokers with git hash: 9e894aa0173b14d64a900bcf780d6b7809368384
> {code}
> Found one Java-level deadlock:
> =============================
> "kafka-request-handler-a":
>   waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "ExpirationReaper-xyz":
>   waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a java.util.LinkedList),
>   which is held by "kafka-request-handler-b"
> "kafka-request-handler-b":
>   waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "kafka-request-handler-a":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
>         at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
>         at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
>         at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
>         at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
>         at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
>         at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
>         at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
>         at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
>         at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> "ExpirationReaper-xyz":
>         at kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
>         - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList)
>         at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
>         at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
>         at kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
>         - locked <0x000000071a86a478> (a java.util.LinkedList)
>         at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
>         at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> "kafka-request-handler-b":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>         at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
>         at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303)
>         - locked <0x00000006b0563fe8> (a java.util.LinkedList)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228)
>         at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
>         at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426)
>         at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
>         at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
>         at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> Found 1 deadlock.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)