You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chia-Ping Tsai (Jira)" <ji...@apache.org> on 2023/02/20 13:59:00 UTC

[jira] [Assigned] (KAFKA-14295) FetchMessageConversionsPerSec meter not recorded

     [ https://issues.apache.org/jira/browse/KAFKA-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chia-Ping Tsai reassigned KAFKA-14295:
--------------------------------------

    Assignee: Chia-Ping Tsai

> FetchMessageConversionsPerSec meter not recorded
> ------------------------------------------------
>
>                 Key: KAFKA-14295
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14295
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: David Mao
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>
> The broker topic metric FetchMessageConversionsPerSec doesn't get recorded on a fetch message conversion.
> The bug is that we pass in a callback that expects a MultiRecordsSend in KafkaApis:
> {code:java}
> def updateConversionStats(send: Send): Unit = {
>   send match {
>     case send: MultiRecordsSend if send.recordConversionStats != null =>
>       send.recordConversionStats.asScala.toMap.foreach {
>         case (tp, stats) => updateRecordConversionStats(request, tp, stats)
>       }
>     case _ =>
>   }
> } {code}
> But we call this callback with a NetworkSend in the SocketServer:
> {code:java}
> selector.completedSends.forEach { send =>
>   try {
>     val response = inflightResponses.remove(send.destinationId).getOrElse {
>       throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")
>     }
>     updateRequestMetrics(response)
>     // Invoke send completion callback
>     response.onComplete.foreach(onComplete => onComplete(send))
> ...{code}
> Note that Selector.completedSends returns a collection of NetworkSend



--
This message was sent by Atlassian Jira
(v8.20.10#820010)