You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Viktor Somogyi-Vass (Jira)" <ji...@apache.org> on 2023/02/02 13:25:00 UTC

[jira] [Assigned] (KAFKA-14667) Delayed leader election operation gets stuck in purgatory

     [ https://issues.apache.org/jira/browse/KAFKA-14667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Viktor Somogyi-Vass reassigned KAFKA-14667:
-------------------------------------------

    Assignee: Viktor Somogyi-Vass

> Delayed leader election operation gets stuck in purgatory
> ---------------------------------------------------------
>
>                 Key: KAFKA-14667
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14667
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.1.1
>            Reporter: Daniel Urban
>            Assignee: Viktor Somogyi-Vass
>            Priority: Major
>
> This was observed with Kafka 3.1.1, but I believe that latest versions are also affected.
> In the Cruise Control project, there is an integration test: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle
> On our infrastructure, this test fails every ~20th run with a timeout - the triggered preferred leadership election is never completed. After some investigation, it turns out that:
>  # The admin client never gets a response from the broker.
>  # The leadership change is executed successfully.
>  # The ElectLeader purgatory never gets an update for the relevant topic partition.
> A few relevant lines from a failed run (this test uses an embedded cluster, logs are mixed):
> CC successfully sends a preferred election request to the controller (broker 0), topic1-0 needs a leadership change from broker 0 to broker 1:
> {code:java}
> 2023-02-01 01:20:26.028 [controller-event-thread] DEBUG kafka.controller.KafkaController - [Controller id=0] Waiting for any successful result for election type (PREFERRED) by AdminClientTriggered for partitions: Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, message=Leader election not needed for topic partition.)))
> 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) {code}
> The delayed operation for the leader election is triggered 2 times in quick succession (yes, same ms in both logs):
> {code:java}
> 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1)
> 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1){code}
> Shortly after (few ms later based on the logs), broker 0 receives an UpdateMetadataRequest from the controller (itself) and processes it:
> {code:java}
> 2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Sending UPDATE_METADATA request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19) and timeout 30000 to node 0: UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, ungroupedPartitionStates=[], topicStates=[UpdateMetadataTopicState(topicName='topic1', topicId=gkFP8VnkSGyEf_LBBZSowQ, partitionStates=[UpdateMetadataPartitionState(topicName='topic1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null)])
> 2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Received UPDATE_METADATA response from node 0 for request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19): UpdateMetadataResponseData(errorCode=0)
> 2023-02-01 01:20:26.035 [data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG kafka.request.logger - Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} {code}
> The update metadata request should trigger an update on the ElectLeader purgatory, and we should see a log line like this: "Request key X unblocked Y ElectLeader."
> In the failing test, this last line never appears. In successful tests, it appears.
>  
> I believe that kafka.server.KafkaApis#handleUpdateMetadataRequest, kafka.server.ReplicaManager#hasDelayedElectionOperations and kafka.server.DelayedOperationPurgatory#tryCompleteElseWatch have a concurrency issue.
> handleUpdateMetadataRequest calls hasDelayedElectionOperations which doesn't lock on the state of the purgatory:
> {code:java}
> if (replicaManager.hasDelayedElectionOperations) {
>   updateMetadataRequest.partitionStates.forEach { partitionState =>
>     val tp = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
>     replicaManager.tryCompleteElection(TopicPartitionOperationKey(tp))
>   }
> } {code}
> Since the "Request key X unblocked Y ElectLeader." log never appears in the failed run, but the request processing finishes (so it is not a deadlock in the request handler), it is safe to assume that handleUpdateMetadataRequest never enters the then branch.
> I don't have an exact scenario how can this happen (a concurrent metadata update and a delayed elect leader operation are not "syncing" up), but this definitely looks like a concurrency problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)