You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/13 08:45:26 UTC

kafka git commit: MINOR: Various small improvements to kafka.metrics.MetricsTest

Repository: kafka
Updated Branches:
  refs/heads/trunk c5aeaa7d8 -> 1af096039


MINOR: Various small improvements to kafka.metrics.MetricsTest

`testBrokerTopicMetricsUnregisteredAfterDeletingTopic` seemed completely broken,
as it was creating a topic but producing/consuming to another one.

Authored with mpburg

Author: Mickael Maison <mi...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3034 from mimaison/Fix-testBrokerTopicMetricsBytesInOut


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1af09603
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1af09603
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1af09603

Branch: refs/heads/trunk
Commit: 1af096039d7a2d8fd865b0eaee28c70f10e07008
Parents: c5aeaa7
Author: Mickael Maison <mi...@gmail.com>
Authored: Sat May 13 09:43:58 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat May 13 09:45:17 2017 +0100

----------------------------------------------------------------------
 .../scala/unit/kafka/metrics/MetricsTest.scala  | 27 ++++++++++----------
 1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1af09603/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index d54e0b3..c586a54 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -41,7 +41,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   val numParts = 2
   val topic = "topic1"
 
-  val overridingProps = new Properties()
+  val overridingProps = new Properties
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
   def generateConfigs() =
@@ -55,16 +55,16 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     // create topic topic1 with 1 partition on broker 0
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
     // force creation not client's specific metrics.
-    createAndShutdownStep("group0", "consumer0", "producer0")
+    createAndShutdownStep(topic, "group0", "consumer0", "producer0")
 
     //this assertion is only used for creating the metrics for DelayedFetchMetrics, it should never fail, but should not be removed
     assertNotNull(DelayedFetchMetrics)
 
-    val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
+    val countOfStaticMetrics = Metrics.defaultRegistry.allMetrics.keySet.size
 
     for (i <- 0 to 5) {
-      createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
-      assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+      createAndShutdownStep(topic, "group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
+      assertEquals(countOfStaticMetrics, Metrics.defaultRegistry.allMetrics.keySet.size)
     }
   }
 
@@ -81,7 +81,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
     val topic = "test-broker-topic-metric"
     AdminUtils.createTopic(zkUtils, topic, 2, 1)
-    createAndShutdownStep("group0", "consumer0", "producer0")
+    createAndShutdownStep(topic, "group0", "consumer0", "producer0")
+    assertTrue("Topic metrics don't exist", checkTopicMetricsExists(topic))
     assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -91,17 +92,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testClusterIdMetric(): Unit = {
     // Check if clusterId metric exists.
-    val metrics = Metrics.defaultRegistry().allMetrics
+    val metrics = Metrics.defaultRegistry.allMetrics
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
   }
 
   @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-  def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
+  def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
     sendMessages(servers, topic, nMessages)
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder, new StringDecoder)
     getMessages(topicMessageStreams1, nMessages)
 
     zkConsumerConnector1.shutdown()
@@ -114,7 +115,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic"
     val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic"
 
-    val topicConfig = new Properties()
+    val topicConfig = new Properties
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
     createTopic(zkUtils, topic, 1, numNodes, servers, topicConfig)
     // Produce a few messages to create the metrics
@@ -151,10 +152,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   }
 
   private def checkTopicMetricsExists(topic: String): Boolean = {
-    val topicMetricRegex = new Regex(".*("+topic+")$")
-    val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
+    val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$")
+    val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).entrySet
     for (metricGroup <- metricGroups.asScala) {
-      if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
+      if (topicMetricRegex.pattern.matcher(metricGroup.getKey).matches)
         return true
     }
     false