You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2023/02/23 17:50:58 UTC
[kafka] branch trunk updated: KAFKA-14295 FetchMessageConversionsPerSec meter not recorded (#13279)
This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7626a430792 KAFKA-14295 FetchMessageConversionsPerSec meter not recorded (#13279)
7626a430792 is described below
commit 7626a43079298e88895b6f9e2fe3f8206da0155c
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Fri Feb 24 01:50:49 2023 +0800
KAFKA-14295 FetchMessageConversionsPerSec meter not recorded (#13279)
Reviewers: Luke Chen <sh...@gmail.com>
---
.../src/main/java/org/apache/kafka/common/network/NetworkSend.java | 4 ++++
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +++-
.../unit/kafka/server/FetchRequestDownConversionConfigTest.scala | 4 ++++
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 6 ++++++
4 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
index 2a51a56932f..f2977b9d9e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
@@ -31,6 +31,10 @@ public class NetworkSend implements Send {
return destinationId;
}
+ public Send send() {
+ return send;
+ }
+
@Override
public boolean completed() {
return send.completed();
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7b65bbe50d5..754b6e323fd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,7 +49,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, Send}
+import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
@@ -893,6 +893,8 @@ class KafkaApis(val requestChannel: RequestChannel,
send.recordConversionStats.asScala.toMap.foreach {
case (tp, stats) => updateRecordConversionStats(request, tp, stats)
}
+ case send: NetworkSend =>
+ updateConversionStats(send.send())
case _ =>
}
}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index aa8a652741f..3f8ba8e099a 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -163,6 +163,7 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
}
def testV1Fetch(isFollowerFetch: Boolean): Unit = {
+ val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)
val topicWithDownConversionEnabled = "foo"
val topicWithDownConversionDisabled = "bar"
val replicaIds = brokers.map(_.config.brokerId)
@@ -221,6 +222,9 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
} else {
assertEquals(Errors.UNSUPPORTED_VERSION, error(partitionWithDownConversionDisabled))
}
+ TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec,
+ s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " +
+ s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
}
private def sendFetch(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2a71f86a453..ab72839fe22 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2099,6 +2099,12 @@ object TestUtils extends Logging {
.count
}
+ def metersCount(metricName: String): Long = {
+ KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+ .filter { case (k, _) => k.getMBeanName.endsWith(metricName)}
+ .values.map(_.asInstanceOf[Meter].count()).sum
+ }
+
def clearYammerMetrics(): Unit = {
for (metricName <- KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala)
KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)