You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Mayuresh Gharat (JIRA)" <ji...@apache.org> on 2015/06/06 01:14:00 UTC

[jira] [Commented] (KAFKA-2160) DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list

    [ https://issues.apache.org/jira/browse/KAFKA-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14575390#comment-14575390 ] 

Mayuresh Gharat commented on KAFKA-2160:
----------------------------------------

We hit a deadlock while running brokers with git hash: 9e894aa0173b14d64a900bcf780d6b7809368384
{code}
Found one Java-level deadlock:
=============================
"kafka-request-handler-a":
  waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "ExpirationReaper-xyz"
"ExpirationReaper-xyz":
  waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a java.util.LinkedList),
  which is held by "kafka-request-handler-b"
"kafka-request-handler-b":
  waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "ExpirationReaper-xyz"

"kafka-request-handler-a":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
        at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
        at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
        at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
        at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
        at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
        at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
        at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
        at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
        at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:745)

"ExpirationReaper-xyz":
        at kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
        - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList)
        at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
        at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
        at kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
        - locked <0x000000071a86a478> (a java.util.LinkedList)
        at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
        at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"kafka-request-handler-b":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
        at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
        at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303)
        - locked <0x00000006b0563fe8> (a java.util.LinkedList)
        at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228)
        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
        at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426)
        at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410)
        at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
        at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
        at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.
{code}

We'll track this in a separate jira :
https://issues.apache.org/jira/browse/KAFKA-2253

> DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2160
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2160
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>         Attachments: KAFKA-2160.patch, KAFKA-2160.patch, KAFKA-2160_2015-04-30_15:20:14.patch, KAFKA-2160_2015-05-06_16:31:48.patch, KAFKA-2160_2015-05-18_14:07:48.patch
>
>
> With purgatory usage in consumer coordinator, it will be common that watcher lists are very short and live only for a short time. So we'd better clean them from the watchersForKey Pool once the list become empty in checkAndComplete() calls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)