You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rajini Sivaram (JIRA)" <ji...@apache.org> on 2018/10/31 19:21:00 UTC

[jira] [Created] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start fetchers

Rajini Sivaram created KAFKA-7576:
-------------------------------------

             Summary: Dynamic update of replica fetcher threads may fail to start fetchers
                 Key: KAFKA-7576
                 URL: https://issues.apache.org/jira/browse/KAFKA-7576
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 2.0.0, 1.1.1, 1.0.2, 2.1.0
            Reporter: Rajini Sivaram
            Assignee: Rajini Sivaram


When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector.

After the failure, another update of the config or broker restart is required to restart the replica fetchers.

Exception stack trace:
{code:java}
java.lang.IllegalArgumentException
        at java.nio.Buffer.position(Buffer.java:244)
        at sun.nio.ch.IOUtil.write(IOUtil.java:68)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
        at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
        at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
        at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
        at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
        at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
        at org.apache.kafka.common.network.Selector.close(Selector.java:736)
        at org.apache.kafka.common.network.Selector.close(Selector.java:698)
        at org.apache.kafka.common.network.Selector.close(Selector.java:314)
        at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
        at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
        at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
        at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
        at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
        at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
        at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
        at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
        at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
        at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
        at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
<SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)