You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/02/26 07:16:59 UTC

[kafka] branch trunk updated: Kafka-14743: update request metrics after callback (#13297)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae672574065 Kafka-14743: update request metrics after callback (#13297)
ae672574065 is described below

commit ae6725740651cc76280840af3a657f9bb9522e37
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sun Feb 26 15:16:51 2023 +0800

    Kafka-14743: update request metrics after callback (#13297)
    
    Currently, the kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Fetch will not get updated because the request metrics is recorded BEFORE the messageConversions metrics value updated. That means, even if we updated the messageConversions metrics value, the request metrics will never reflect the update. This patch fixes it by updating the request metric after callback completed, so that the messageConversions metric value can be updated correctly.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Divij Vaidya <di...@amazon.com>
---
 .../main/scala/kafka/network/SocketServer.scala    |  7 +++---
 .../FetchRequestDownConversionConfigTest.scala     | 25 +++++++++++++++++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 10 ++++++---
 3 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index dca6c108374..6807a2a82fc 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1175,10 +1175,11 @@ private[kafka] class Processor(
         val response = inflightResponses.remove(send.destinationId).getOrElse {
           throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")
         }
-        updateRequestMetrics(response)
-
-        // Invoke send completion callback
+        
+        // Invoke send completion callback, and then update request metrics since there might be some
+        // request metrics got updated during callback
         response.onComplete.foreach(onComplete => onComplete(send))
+        updateRequestMetrics(response)
 
         // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
         // it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 3f8ba8e099a..d01228c1d13 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -18,6 +18,7 @@ package kafka.server
 
 import java.util
 import java.util.{Optional, Properties}
+import kafka.network.RequestMetrics.{MessageConversionsTimeMs, TemporaryMemoryBytes}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.TopicConfig
@@ -163,7 +164,12 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
   }
 
   def testV1Fetch(isFollowerFetch: Boolean): Unit = {
+    val fetchRequest = "request=Fetch"
+    val fetchTemporaryMemoryBytesMetricName = s"$TemporaryMemoryBytes,$fetchRequest"
+    val fetchMessageConversionsTimeMsMetricName = s"$MessageConversionsTimeMs,$fetchRequest"
     val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)
+    val initialFetchMessageConversionsTimeMs = TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)
+    val initialFetchTemporaryMemoryBytes = TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)
     val topicWithDownConversionEnabled = "foo"
     val topicWithDownConversionDisabled = "bar"
     val replicaIds = brokers.map(_.config.brokerId)
@@ -216,15 +222,28 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
       Errors.forCode(fetchResponseData.get(tp).errorCode)
     }
 
+    def verifyMetrics(): Unit = {
+      TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec,
+        s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
+
+      TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > initialFetchMessageConversionsTimeMs,
+        s"The `MessageConversionsTimeMs` in fetch request metric count is not incremented after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsTimeMs final: ${TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)}", 5000)
+
+      TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName) > initialFetchTemporaryMemoryBytes,
+        s"The `TemporaryMemoryBytes` in fetch request metric count is not incremented after 5 seconds. " +
+          s"init: $initialFetchTemporaryMemoryBytes final: ${TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)}", 5000)
+    }
+
     assertEquals(Errors.NONE, error(partitionWithDownConversionEnabled))
     if (isFollowerFetch) {
       assertEquals(Errors.NONE, error(partitionWithDownConversionDisabled))
     } else {
       assertEquals(Errors.UNSUPPORTED_VERSION, error(partitionWithDownConversionDisabled))
     }
-    TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec,
-      s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " +
-      s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
+
+    verifyMetrics()
   }
 
   private def sendFetch(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ab72839fe22..dae338a3b08 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,7 +28,7 @@ import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
-import com.yammer.metrics.core.{Gauge, Meter}
+import com.yammer.metrics.core.{Gauge, Histogram, Meter}
 
 import javax.net.ssl.X509TrustManager
 import kafka.api._
@@ -2101,8 +2101,12 @@ object TestUtils extends Logging {
 
   def metersCount(metricName: String): Long = {
     KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-      .filter { case (k, _) => k.getMBeanName.endsWith(metricName)}
-      .values.map(_.asInstanceOf[Meter].count()).sum
+      .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
+      .values.map {
+        case histogram: Histogram => histogram.count()
+        case meter: Meter => meter.count()
+        case _ => 0
+      }.sum
   }
 
   def clearYammerMetrics(): Unit = {