You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/17 06:53:45 UTC

[kafka] branch 1.0 updated: MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 2b236f9  MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681)
2b236f9 is described below

commit 2b236f9dc8199028d65343e959487c8e6eb1e12b
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Mar 13 06:23:00 2018 +0000

    MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681)
---
 .../test/scala/integration/kafka/api/MetricsTest.scala | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 26022be..b9a1b50 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -43,6 +43,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
   this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false")
+  this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
+  this.producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1000000")
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   override protected val serverSaslProperties =
     Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
@@ -90,7 +92,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     verifyClientVersionMetrics(this.producers.head.metrics, "Producer")
 
     val server = servers.head
-    verifyBrokerMessageConversionMetrics(server, recordSize)
+    verifyBrokerMessageConversionMetrics(server, recordSize, tp)
     verifyBrokerErrorMetrics(servers.head)
     verifyBrokerZkMetrics(server, topic)
 
@@ -187,7 +189,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics"))
   }
 
-  private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int): Unit = {
+  private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int, tp: TopicPartition): Unit = {
     val requestMetricsPrefix = "kafka.network:type=RequestMetrics"
     val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce")
     val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce")
@@ -195,7 +197,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
         tempBytes >= recordSize)
 
     verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
-    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0)
+
+    // Conversion time less than 1 millisecond is reported as zero, so retry with larger batches until time > 0
+    var iteration = 0
+    TestUtils.retry(5000) {
+      val conversionTimeMs = yammerMetricValue(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").asInstanceOf[Double]
+      if (conversionTimeMs <= 0.0) {
+        iteration += 1
+        sendRecords(producers.head, 1000 * iteration, 100, tp)
+      }
+      assertTrue(s"Message conversion time not recorded $conversionTimeMs", conversionTimeMs > 0.0)
+    }
 
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.