You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Satish Duggana (Jira)" <ji...@apache.org> on 2021/09/29 04:53:00 UTC

[jira] [Updated] (KAFKA-13331) Slow reassignments in 2.8 because of large number of UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),) Events

     [ https://issues.apache.org/jira/browse/KAFKA-13331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Satish Duggana updated KAFKA-13331:
-----------------------------------
    Priority: Critical  (was: Major)

> Slow reassignments in 2.8 because of large number of  UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>) Events
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13331
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13331
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin
>    Affects Versions: 2.8.1, 3.0.0
>            Reporter: GEORGE LI
>            Priority: Critical
>             Fix For: 3.1.0
>
>         Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png
>
>
> Slowness is observed when doing reassignments on clusters with more brokers (e.g. 80 brokers). 
> After investigation, it looks like the slowness is because  for reassignments, it sends the UpdateMetadataRequest to all the broker for every topic partition affected by the reassignment (maybe some optimization can be done).   e.g. 
> for a reassignment with batch size of 50 partitions.  it will generate about 10k - 20k ControllerEventManager.EventQueueSize and  the p99 EventQueueTimeMs will be  1M.   if the batch size is 100 partitions,   about 40K EventQeuueSize and  3M p99 EventQueueTimeMs.   See below screen shot on the metrics.  
>  !Screen Shot 2021-09-28 at 12.57.34 PM.png! 
> it takes about 10-30minutes to process 100 reassignments per batch.   and 20-30 seconds for 1 reassignment per batch even the topic partition is almost empty.   Version 1.1, the reassignment is almost instant. 
> Looking at what is in the ControllerEventManager.EventQueue,  the majority (depends on the how many brokers in the cluster, it can be 90%+)  is  {{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)}} events.   which is introduced  in this commit: 
> {code}
> commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef
> Author: Jason Gustafson <ja...@confluent.io>
> Date:   Thu Nov 21 07:41:29 2019 -0800
>     MINOR: Controller should log UpdateMetadata response errors (#7717)
>     
>     Create a controller event for handling UpdateMetadata responses and log a message when a response contains an error.
>     
>     Reviewers: Stanislav Kozlovski <st...@outlook.com>, Ismael Juma <is...@juma.me.uk>
> {code}
> Checking how the events in the ControllerEventManager are processed for  {{UpdateMetadata response}},  it seems like it's only checking whether there is an error, and simply log the error.  but it takes about 3ms - 60ms  to dequeue each event.  Because it's a FIFO queue,  other events were waiting in the queue. 
> {code}
>   private def processUpdateMetadataResponseReceived(updateMetadataResponse: UpdateMetadataResponse, brokerId: Int): Unit = {
>     if (!isActive) return
>     if (updateMetadataResponse.error != Errors.NONE) {
>       stateChangeLogger.error(s"Received error ${updateMetadataResponse.error} in UpdateMetadata " +
>         s"response $updateMetadataResponse from broker $brokerId")
>     }
>   }
> {code}
> There might be more sophisticated logic for handling the UpdateMetadata response error in the future.   For current version,   would it be better to check whether  the response error code is  Errors.NONE before putting into the Event Queue?   e.g.  I put this additional check and see the Reassignment Performance increase dramatically on the 80 brokers cluster. 
> {code}
>      val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
>         controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, topicIds.asJava)
>       sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) => {
>         val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
>         if (updateMetadataResponse.error != Errors.NONE) {   //<============ Add additional check whether the response code,   if no error, which is almost 99.99% the case, skip adding updateMetadataResponse to the Event Queue. 
>           sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, broker))
>         }
>       })
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)