You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/02 23:08:00 UTC

[jira] [Commented] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers

    [ https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673801#comment-16673801 ] 

ASF GitHub Bot commented on KAFKA-7576:
---------------------------------------

rajinisivaram opened a new pull request #5875: KAFKA-7576: Fix shutdown of replica fetcher threads
URL: https://github.com/apache/kafka/pull/5875
 
 
   ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Dynamic update of replica fetcher threads may fail to start/close fetchers
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-7576
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7576
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.1, 2.0.1, 2.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.1.2, 2.1.1, 2.0.2
>
>
> KAFKA-6051 moved  ReplicaFetcherBlockingSend shutdown earlier in the shutdown sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers can now throw an exception because Selector may be closed on a different thread while data is being written on another thread. KAFKA-7464 changed this behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and not propagated to avoid exceptions during broker shutdown.
> When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector.
> Without the fix from KAFKA-7464, another update of the config or broker restart is required to restart the replica fetchers after dynamic config update if shutdown encounters an exception.
> Exception stack trace:
> {code:java}
> java.lang.IllegalArgumentException
>         at java.nio.Buffer.position(Buffer.java:244)
>         at sun.nio.ch.IOUtil.write(IOUtil.java:68)
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>         at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
>         at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
>         at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
>         at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
>         at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:736)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:698)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:314)
>         at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
>         at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>         at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
>         at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
>         at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
>         at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>         at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
>         at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
>         at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
>         at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
>         at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
>         at scala.collection.immutable.List.foreach(List.scala:392)
>         at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
> <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of replica fetchers during dynamic update. But even for those branches, we should clean up the Selector to avoid resource leak in the dynamic config update case (discarding the exception may be sufficient when the broker is shutdown).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)