You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Divij Vaidya (Jira)" <ji...@apache.org> on 2023/12/29 09:56:00 UTC

[jira] [Resolved] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

     [ https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Divij Vaidya resolved KAFKA-16015.
----------------------------------
    Resolution: Fixed

> kafka-leader-election timeout values always overwritten by default values 
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-16015
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16015
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin, tools
>    Affects Versions: 3.5.1, 3.6.1
>            Reporter: Sergio Troiano
>            Assignee: Sergio Troiano
>            Priority: Minor
>             Fix For: 3.7.0, 3.8.0
>
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: __KafkaCruiseControlModelTrainingSamples-18: org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: __KafkaCruiseControlPartitionMetricSamples-8: org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> ------                                  -----------
> --admin.config <String: config file>    Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  and *default.api.timeout.ms.* So even after increasing these values  I got the same result, timeouts were happening, like the new values were not having any effect. 
> So I checked the source code and I came across with a bug, no matter the value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
>     )    validate(commandOptions)    val electionType = commandOptions.options.valueOf(commandOptions.electionType)    val jsonFileTopicPartitions = Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use.
>      * The validate function should be checking that this option is required if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString)      Admin.create(props)
>     } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 30/2 which validates the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file are overwritten by the defaults.
>  
>  
> The proposal is easy, we need to use the defaults values only when the timeouts were not defined by the config file, for example like this:
> {code:java}
>       if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
>           props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
>       }
>       if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
>           props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString)
>       } {code}
>  
> I tested it and now I am able to modify the timeouts and make my application to catch the result of the command properly.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)