You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rajini Sivaram (JIRA)" <ji...@apache.org> on 2018/11/16 11:17:00 UTC

[jira] [Resolved] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers

     [ https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rajini Sivaram resolved KAFKA-7576.
-----------------------------------
    Resolution: Fixed
      Reviewer: Jason Gustafson

> Dynamic update of replica fetcher threads may fail to start/close fetchers
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-7576
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7576
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.1, 2.0.1, 2.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.1.2, 2.1.1, 2.0.2
>
>
> KAFKA-6051 moved  ReplicaFetcherBlockingSend shutdown earlier in the shutdown sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers can now throw an exception because Selector may be closed on a different thread while data is being written on another thread. KAFKA-7464 changed this behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and not propagated to avoid exceptions during broker shutdown.
> When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector.
> Without the fix from KAFKA-7464, another update of the config or broker restart is required to restart the replica fetchers after dynamic config update if shutdown encounters an exception.
> Exception stack trace:
> {code:java}
> java.lang.IllegalArgumentException
>         at java.nio.Buffer.position(Buffer.java:244)
>         at sun.nio.ch.IOUtil.write(IOUtil.java:68)
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>         at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
>         at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
>         at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
>         at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
>         at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:736)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:698)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:314)
>         at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
>         at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>         at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
>         at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
>         at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
>         at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>         at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
>         at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
>         at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
>         at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
>         at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
>         at scala.collection.immutable.List.foreach(List.scala:392)
>         at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
> <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of replica fetchers during dynamic update. But even for those branches, we should clean up the Selector to avoid resource leak in the dynamic config update case (discarding the exception may be sufficient when the broker is shutdown).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)