You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/05/23 18:10:21 UTC

[jira] [Commented] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

    [ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13665299#comment-13665299 ] 

Jun Rao commented on KAFKA-916:
-------------------------------

Thanks for finding this. The deadlock is introduced because AbstractFetcherManager.removeFetcher() can be called from AbstractFetcherThread and AbstractFetcherManager.closeAllFetchers can wait for AbstractFetcherThread to stop. This is only happening for the ConsumerFetcherThread. So, one possible fix is to remove the error partitions from the fetcher directly in ConsumerFetcherThread.handlePartitionsWithErrors(), instead of going through ConsumerFetcherManager. This will break the cycle.
                
> Deadlock between fetcher shutdown and handling partitions with error
> --------------------------------------------------------------------
>
>                 Key: KAFKA-916
>                 URL: https://issues.apache.org/jira/browse/KAFKA-916
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Joel Koshy
>             Fix For: 0.8
>
>
> Here is another consumer deadlock that we encountered. All consumers are
> vulnerable to this during a rebalance if there happen to be partitions in
> error.
> On a rebalance, the fetcher manager closes all fetchers and this holds on to
> the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
> While the fetcher manager is iterating over fetchers to stop them, a fetcher
> that is yet to be stopped hits an error on a partition and proceeds to
> handle partitions with error [t2]. This handling involves looking up the
> fetcher for that partition and then removing it from the fetcher's set of
> partitions to consume. This requires grabbing the same map lock in [t1],
> hence the deadlock.
> [t1]
> 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b waiting on condition [0x00007f1b2bd38000]
> 2013-05-22_20:23:11.95767    java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95767 	at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95767 	- parking to wait for  <0x00007f1a25780598> (a java.util.concurrent.CountDownLatch$Sync)
> 2013-05-22_20:23:11.95767 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95767 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 2013-05-22_20:23:11.95768 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 2013-05-22_20:23:11.95768 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 2013-05-22_20:23:11.95768 	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 2013-05-22_20:23:11.95768 	at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> 2013-05-22_20:23:11.95769 	at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
> 2013-05-22_20:23:11.95769 	at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
> 2013-05-22_20:23:11.95769 	at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
> 2013-05-22_20:23:11.95769 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95769 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95770 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 2013-05-22_20:23:11.95770 	at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 2013-05-22_20:23:11.95770 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 2013-05-22_20:23:11.95770 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 2013-05-22_20:23:11.95771 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 2013-05-22_20:23:11.95771 	at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
> ---> 2013-05-22_20:23:11.95771 	- locked <0x00007f1a2ae92510> (a java.lang.Object)
> 2013-05-22_20:23:11.95771 	at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
> 2013-05-22_20:23:11.95771 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
> 2013-05-22_20:23:11.95772 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
> 2013-05-22_20:23:11.95772 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
> 2013-05-22_20:23:11.95772 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> 2013-05-22_20:23:11.95772 	at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> 2013-05-22_20:23:11.95773 	at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> 2013-05-22_20:23:11.95773 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
> 2013-05-22_20:23:11.95773 	- locked <0x00007f1a2a29b450> (a java.lang.Object)
> 2013-05-22_20:23:11.95773 	at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
> 2013-05-22_20:23:11.95774 	at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754)
> 2013-05-22_20:23:11.95774 	at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74)
> 2013-05-22_20:23:11.95774 	at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69)
> 2013-05-22_20:23:11.95774 	- locked <0x00007f1a2a69b1d8> (a java.lang.Object)
> 2013-05-22_20:23:11.95774 	at kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46)
> 2013-05-22_20:23:11.95775 	at kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33)
> 2013-05-22_20:23:11.95775 	at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721)
> 2013-05-22_20:23:11.95775 	at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
> 2013-05-22_20:23:11.95776 	at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776 	at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776 	at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> 2013-05-22_20:23:11.95777 	at scala.collection.immutable.List.foreach(List.scala:45)
> 2013-05-22_20:23:11.95777 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95777 	at scala.collection.immutable.List.map(List.scala:45)
> 2013-05-22_20:23:11.95777 	at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95777 	at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> [t2]
> 2013-05-22_20:23:11.87465 "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 tid=0x00007f196401a800 nid=0x717a waiting for monitor entry [0x00007f19bf0ef000]
> 2013-05-22_20:23:11.87466    java.lang.Thread.State: BLOCKED (on object monitor)
> 2013-05-22_20:23:11.87467 	at kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57)
> ---> 2013-05-22_20:23:11.87467 	- waiting to lock <0x00007f1a2ae92510> (a java.lang.Object)
> 2013-05-22_20:23:11.87468 	at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95682 	at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95683 	at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
> 2013-05-22_20:23:11.95684 	at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95684 	at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69)
> 2013-05-22_20:23:11.95684 	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168)
> 2013-05-22_20:23:11.95684 	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> 2013-05-22_20:23:11.95684 	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 2013-05-22_20:23:11.95686 
> 2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000]
> 2013-05-22_20:23:11.95686    java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95686 	at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95686 	- parking to wait for  <0x00007f1a2a4426f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2013-05-22_20:23:11.95687 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95687 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> 2013-05-22_20:23:11.95687 	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> 2013-05-22_20:23:11.95687 	at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira