You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rajini Sivaram (JIRA)" <ji...@apache.org> on 2018/10/31 19:21:00 UTC
[jira] [Created] (KAFKA-7576) Dynamic update of replica fetcher
threads may fail to start fetchers
Rajini Sivaram created KAFKA-7576:
-------------------------------------
Summary: Dynamic update of replica fetcher threads may fail to start fetchers
Key: KAFKA-7576
URL: https://issues.apache.org/jira/browse/KAFKA-7576
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 2.0.0, 1.1.1, 1.0.2, 2.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector.
After the failure, another update of the config or broker restart is required to restart the replica fetchers.
Exception stack trace:
{code:java}
java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at sun.nio.ch.IOUtil.write(IOUtil.java:68)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
at org.apache.kafka.common.network.Selector.close(Selector.java:736)
at org.apache.kafka.common.network.Selector.close(Selector.java:698)
at org.apache.kafka.common.network.Selector.close(Selector.java:314)
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
<SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)