You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Divij Vaidya (Jira)" <ji...@apache.org> on 2023/12/29 09:56:00 UTC
[jira] [Resolved] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values
[ https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Divij Vaidya resolved KAFKA-16015.
----------------------------------
Resolution: Fixed
> kafka-leader-election timeout values always overwritten by default values
> --------------------------------------------------------------------------
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
> Issue Type: Bug
> Components: admin, tools
> Affects Versions: 3.5.1, 3.6.1
> Reporter: Sergio Troiano
> Assignee: Sergio Troiano
> Priority: Minor
> Fix For: 3.7.0, 3.8.0
>
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: __KafkaCruiseControlModelTrainingSamples-18: org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: __KafkaCruiseControlPartitionMetricSamples-8: org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 seconds.
>
> So i checked this command has an option to pass configurations
> {code:java}
> Option Description
> ------ -----------
> --admin.config <String: config file> Configuration properties files to pass
> to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms* and *default.api.timeout.ms.* So even after increasing these values I got the same result, timeouts were happening, like the new values were not having any effect.
> So I checked the source code and I came across with a bug, no matter the value we pass to the timeouts the default values were ALWAYS overwriting them.
>
> This is the[3.6 branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
> def main(args: Array[String]): Unit = {
> run(args, 30.second)
> } def run(args: Array[String], timeout: Duration): Unit = {
> val commandOptions = new LeaderElectionCommandOptions(args)
> CommandLineUtils.maybePrintHelpOrVersion(
> commandOptions,
> "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
> ) validate(commandOptions) val electionType = commandOptions.options.valueOf(commandOptions.electionType) val jsonFileTopicPartitions = Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { path =>
> parseReplicaElectionData(Utils.readFileAsString(path))
> } val singleTopicPartition = (
> Option(commandOptions.options.valueOf(commandOptions.topic)),
> Option(commandOptions.options.valueOf(commandOptions.partition))
> ) match {
> case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, partition)))
> case _ => None
> } /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use.
> * The validate function should be checking that this option is required if the --topic and --path-to-json-file
> * are not specified.
> */
> val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition) val adminClient = {
> val props = Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { config =>
> Utils.loadProps(config)
> }.getOrElse(new Properties()) props.setProperty(
> AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
> commandOptions.options.valueOf(commandOptions.bootstrapServer)
> )
> props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
> props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString) Admin.create(props)
> } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 30/2 which validates the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file are overwritten by the defaults.
>
>
> The proposal is easy, we need to use the defaults values only when the timeouts were not defined by the config file, for example like this:
> {code:java}
> if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
> props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
> }
> if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
> props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString)
> } {code}
>
> I tested it and now I am able to modify the timeouts and make my application to catch the result of the command properly.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)