You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/06/09 17:46:19 UTC

[jira] [Updated] (KAFKA-937) ConsumerFetcherThread can deadlock

     [ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-937:
--------------------------

    Attachment: kafka-937.patch

Attach a patch. The fix is to make sure that the fetcher thread never gets blocked, no matter what other threads like the LeaderFindThread does. Specifically, LeaderFinderThread no longer holds lock when calling addFetcher() or shudownIdleFetcherThreads(). This way ConsumerFetcherManager.addPartitionsWithError() never gets blocked, which in turn means that the ConsumerFetcherThread never gets blocked and can complete the shutdown if required.

Double-checked other paths and don't see any other potential deadlocks.

Also fixed another potential socket leak through SimpleConsumer. When we shutdown a fetcher, we first interrupt the fetcher thread and close the SimpleConsumer. However, after that, it is possible for the fetcher thread to make another fetch request on SimpleConsumer. This will establish the socket connection again. Add a fix in SimpleConsumer so that after it is closed, no new socket connections will be established and the fetch call will get a ClosedChannelException instead.
                
> ConsumerFetcherThread can deadlock
> ----------------------------------
>
>                 Key: KAFKA-937
>                 URL: https://issues.apache.org/jira/browse/KAFKA-937
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Jun Rao
>         Attachments: kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira