You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/02/26 07:29:01 UTC

[kafka] branch 3.4 updated: Kafka-14743: update request metrics after callback (#13297)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new fa88333039f Kafka-14743: update request metrics after callback (#13297)
fa88333039f is described below

commit fa88333039fee0d26778c78c7b010e1b751d4c96
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sun Feb 26 15:16:51 2023 +0800

    Kafka-14743: update request metrics after callback (#13297)
    
    Currently, the kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Fetch will not get updated because the request metrics is recorded BEFORE the messageConversions metrics value updated. That means, even if we updated the messageConversions metrics value, the request metrics will never reflect the update. This patch fixes it by updating the request metric after callback completed, so that the messageConversions metric value can be updated correctly.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Divij Vaidya <di...@amazon.com>
---
 .../main/scala/kafka/network/SocketServer.scala    |  7 ++++---
 .../FetchRequestDownConversionConfigTest.scala     | 24 +++++++++++++++++++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 12 ++++++++++-
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index e91c240415c..0c08d7b056a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1130,10 +1130,11 @@ private[kafka] class Processor(
         val response = inflightResponses.remove(send.destinationId).getOrElse {
           throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")
         }
-        updateRequestMetrics(response)
-
-        // Invoke send completion callback
+        
+        // Invoke send completion callback, and then update request metrics since there might be some
+        // request metrics got updated during callback
         response.onComplete.foreach(onComplete => onComplete(send))
+        updateRequestMetrics(response)
 
         // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
         // it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 56a3485da40..0a86384a3d3 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -18,8 +18,8 @@ package kafka.server
 
 import java.util
 import java.util.{Optional, Properties}
-
 import kafka.log.LogConfig
+import kafka.network.RequestMetrics.{MessageConversionsTimeMs, TemporaryMemoryBytes}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.message.FetchResponseData
@@ -164,6 +164,12 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
   }
 
   def testV1Fetch(isFollowerFetch: Boolean): Unit = {
+    val fetchRequest = "request=Fetch"
+    val fetchTemporaryMemoryBytesMetricName = s"$TemporaryMemoryBytes,$fetchRequest"
+    val fetchMessageConversionsTimeMsMetricName = s"$MessageConversionsTimeMs,$fetchRequest"
+    val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)
+    val initialFetchMessageConversionsTimeMs = TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)
+    val initialFetchTemporaryMemoryBytes = TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)
     val topicWithDownConversionEnabled = "foo"
     val topicWithDownConversionDisabled = "bar"
     val replicaIds = brokers.map(_.config.brokerId)
@@ -216,12 +222,28 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
       Errors.forCode(fetchResponseData.get(tp).errorCode)
     }
 
+    def verifyMetrics(): Unit = {
+      TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec,
+        s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
+
+      TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > initialFetchMessageConversionsTimeMs,
+        s"The `MessageConversionsTimeMs` in fetch request metric count is not incremented after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsTimeMs final: ${TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)}", 5000)
+
+      TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName) > initialFetchTemporaryMemoryBytes,
+        s"The `TemporaryMemoryBytes` in fetch request metric count is not incremented after 5 seconds. " +
+          s"init: $initialFetchTemporaryMemoryBytes final: ${TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)}", 5000)
+    }
+
     assertEquals(Errors.NONE, error(partitionWithDownConversionEnabled))
     if (isFollowerFetch) {
       assertEquals(Errors.NONE, error(partitionWithDownConversionDisabled))
     } else {
       assertEquals(Errors.UNSUPPORTED_VERSION, error(partitionWithDownConversionDisabled))
     }
+
+    verifyMetrics()
   }
 
   private def sendFetch(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e21214b6f7e..525ff734f51 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,7 +28,7 @@ import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
-import com.yammer.metrics.core.{Gauge, Meter}
+import com.yammer.metrics.core.{Gauge, Histogram, Meter}
 
 import javax.net.ssl.X509TrustManager
 import kafka.api._
@@ -2089,6 +2089,16 @@ object TestUtils extends Logging {
       .count
   }
 
+  def metersCount(metricName: String): Long = {
+    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
+      .values.map {
+        case histogram: Histogram => histogram.count()
+        case meter: Meter => meter.count()
+        case _ => 0
+      }.sum
+  }
+
   def clearYammerMetrics(): Unit = {
     for (metricName <- KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala)
       KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)