You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (Jira)" <ji...@apache.org> on 2021/11/09 21:46:00 UTC

[jira] [Updated] (KAFKA-13417) Dynamic thread pool re-configurations may not get processed

     [ https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jason Gustafson updated KAFKA-13417:
------------------------------------
    Description: 
`DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update:
{code}
    val oldConfig = currentConfig
    val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
    if (newConfig ne currentConfig) {
      currentConfig = newConfig
      kafkaConfig.updateCurrentConfig(newConfig)

      // Process BrokerReconfigurable updates after current config is updated
      brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
    }
{code}

The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically:

{code}
  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
    if (newConfig.numIoThreads != oldConfig.numIoThreads)
      server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
    if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
      server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
    if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
      server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
    if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir)
      server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
    if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
      server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
  }
{code}

Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug.

NOTE: This bug only affects kraft, which is missing the call to `DynamicBrokerConfig.initialize()`. 

  was:
`DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update:
{code}
    val oldConfig = currentConfig
    val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
    if (newConfig ne currentConfig) {
      currentConfig = newConfig
      kafkaConfig.updateCurrentConfig(newConfig)

      // Process BrokerReconfigurable updates after current config is updated
      brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
    }
{code}

The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically:

{code}
  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
    if (newConfig.numIoThreads != oldConfig.numIoThreads)
      server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
    if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
      server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
    if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
      server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
    if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir)
      server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
    if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
      server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
  }
{code}

Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug.


> Dynamic thread pool re-configurations may not get processed
> -----------------------------------------------------------
>
>                 Key: KAFKA-13417
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13417
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>
> `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update:
> {code}
>     val oldConfig = currentConfig
>     val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
>     if (newConfig ne currentConfig) {
>       currentConfig = newConfig
>       kafkaConfig.updateCurrentConfig(newConfig)
>       // Process BrokerReconfigurable updates after current config is updated
>       brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
>     }
> {code}
> The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically:
> {code}
>   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
>     if (newConfig.numIoThreads != oldConfig.numIoThreads)
>       server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
>     if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
>       server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
>     if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
>       server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
>     if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir)
>       server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
>     if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
>       server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
>   }
> {code}
> Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug.
> NOTE: This bug only affects kraft, which is missing the call to `DynamicBrokerConfig.initialize()`. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)