You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2023/02/23 17:50:58 UTC

[kafka] branch trunk updated: KAFKA-14295 FetchMessageConversionsPerSec meter not recorded (#13279)

This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7626a430792 KAFKA-14295 FetchMessageConversionsPerSec meter not recorded (#13279)
7626a430792 is described below

commit 7626a43079298e88895b6f9e2fe3f8206da0155c
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Fri Feb 24 01:50:49 2023 +0800

    KAFKA-14295 FetchMessageConversionsPerSec meter not recorded (#13279)
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../src/main/java/org/apache/kafka/common/network/NetworkSend.java  | 4 ++++
 core/src/main/scala/kafka/server/KafkaApis.scala                    | 4 +++-
 .../unit/kafka/server/FetchRequestDownConversionConfigTest.scala    | 4 ++++
 core/src/test/scala/unit/kafka/utils/TestUtils.scala                | 6 ++++++
 4 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
index 2a51a56932f..f2977b9d9e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
@@ -31,6 +31,10 @@ public class NetworkSend implements Send {
         return destinationId;
     }
 
+    public Send send() {
+        return send;
+    }
+
     @Override
     public boolean completed() {
         return send.completed();
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7b65bbe50d5..754b6e323fd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,7 +49,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, Send}
+import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
@@ -893,6 +893,8 @@ class KafkaApis(val requestChannel: RequestChannel,
             send.recordConversionStats.asScala.toMap.foreach {
               case (tp, stats) => updateRecordConversionStats(request, tp, stats)
             }
+          case send: NetworkSend =>
+            updateConversionStats(send.send())
           case _ =>
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index aa8a652741f..3f8ba8e099a 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -163,6 +163,7 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
   }
 
   def testV1Fetch(isFollowerFetch: Boolean): Unit = {
+    val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)
     val topicWithDownConversionEnabled = "foo"
     val topicWithDownConversionDisabled = "bar"
     val replicaIds = brokers.map(_.config.brokerId)
@@ -221,6 +222,9 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
     } else {
       assertEquals(Errors.UNSUPPORTED_VERSION, error(partitionWithDownConversionDisabled))
     }
+    TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec,
+      s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " +
+      s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
   }
 
   private def sendFetch(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2a71f86a453..ab72839fe22 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2099,6 +2099,12 @@ object TestUtils extends Logging {
       .count
   }
 
+  def metersCount(metricName: String): Long = {
+    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter { case (k, _) => k.getMBeanName.endsWith(metricName)}
+      .values.map(_.asInstanceOf[Meter].count()).sum
+  }
+
   def clearYammerMetrics(): Unit = {
     for (metricName <- KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala)
       KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)