You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Joel Koshy (JIRA)" <ji...@apache.org> on 2013/09/03 19:29:52 UTC

[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

    [ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13756807#comment-13756807 ] 

Joel Koshy commented on KAFKA-937:
----------------------------------

The delta patch slipped through the cracks. We hit that issue recently - a network glitch led to the leader-finder-thread hitting an exception while adding fetchers and the thread quit:

{code}
leader-finder-thread], Error due to 
java.net.ConnectException: Connection timed out
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129)
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
        at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:144)
        at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
        at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:180)
        at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:80)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:95)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:92)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:92)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{code}



+1 on kafka-937_delta with one minor comment: change the log to indicate that will attempt to look up the leader again and add fetchers - right now it just says "failed to add".
                
> ConsumerFetcherThread can deadlock
> ----------------------------------
>
>                 Key: KAFKA-937
>                 URL: https://issues.apache.org/jira/browse/KAFKA-937
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Jun Rao
>             Fix For: 0.8
>
>         Attachments: kafka-937_ConsumerOffsetChecker.patch, kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira