You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/05/23 18:10:21 UTC
[jira] [Commented] (KAFKA-916) Deadlock between fetcher shutdown
and handling partitions with error
[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13665299#comment-13665299 ]
Jun Rao commented on KAFKA-916:
-------------------------------
Thanks for finding this. The deadlock is introduced because AbstractFetcherManager.removeFetcher() can be called from AbstractFetcherThread and AbstractFetcherManager.closeAllFetchers can wait for AbstractFetcherThread to stop. This is only happening for the ConsumerFetcherThread. So, one possible fix is to remove the error partitions from the fetcher directly in ConsumerFetcherThread.handlePartitionsWithErrors(), instead of going through ConsumerFetcherManager. This will break the cycle.
> Deadlock between fetcher shutdown and handling partitions with error
> --------------------------------------------------------------------
>
> Key: KAFKA-916
> URL: https://issues.apache.org/jira/browse/KAFKA-916
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Joel Koshy
> Fix For: 0.8
>
>
> Here is another consumer deadlock that we encountered. All consumers are
> vulnerable to this during a rebalance if there happen to be partitions in
> error.
> On a rebalance, the fetcher manager closes all fetchers and this holds on to
> the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
> While the fetcher manager is iterating over fetchers to stop them, a fetcher
> that is yet to be stopped hits an error on a partition and proceeds to
> handle partitions with error [t2]. This handling involves looking up the
> fetcher for that partition and then removing it from the fetcher's set of
> partitions to consume. This requires grabbing the same map lock in [t1],
> hence the deadlock.
> [t1]
> 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b waiting on condition [0x00007f1b2bd38000]
> 2013-05-22_20:23:11.95767 java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95767 - parking to wait for <0x00007f1a25780598> (a java.util.concurrent.CountDownLatch$Sync)
> 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
> 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
> 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
> 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
> ---> 2013-05-22_20:23:11.95771 - locked <0x00007f1a2ae92510> (a java.lang.Object)
> 2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
> 2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
> 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
> 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
> 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> 2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> 2013-05-22_20:23:11.95773 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> 2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
> 2013-05-22_20:23:11.95773 - locked <0x00007f1a2a29b450> (a java.lang.Object)
> 2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
> 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754)
> 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74)
> 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69)
> 2013-05-22_20:23:11.95774 - locked <0x00007f1a2a69b1d8> (a java.lang.Object)
> 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46)
> 2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33)
> 2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721)
> 2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
> 2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> 2013-05-22_20:23:11.95777 at scala.collection.immutable.List.foreach(List.scala:45)
> 2013-05-22_20:23:11.95777 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95777 at scala.collection.immutable.List.map(List.scala:45)
> 2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> [t2]
> 2013-05-22_20:23:11.87465 "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 tid=0x00007f196401a800 nid=0x717a waiting for monitor entry [0x00007f19bf0ef000]
> 2013-05-22_20:23:11.87466 java.lang.Thread.State: BLOCKED (on object monitor)
> 2013-05-22_20:23:11.87467 at kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57)
> ---> 2013-05-22_20:23:11.87467 - waiting to lock <0x00007f1a2ae92510> (a java.lang.Object)
> 2013-05-22_20:23:11.87468 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95682 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95683 at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
> 2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69)
> 2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168)
> 2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> 2013-05-22_20:23:11.95684 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 2013-05-22_20:23:11.95686
> 2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000]
> 2013-05-22_20:23:11.95686 java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95686 at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95686 - parking to wait for <0x00007f1a2a4426f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2013-05-22_20:23:11.95687 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95687 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> 2013-05-22_20:23:11.95687 at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> 2013-05-22_20:23:11.95687 at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira