You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/13 08:45:26 UTC
kafka git commit: MINOR: Various small improvements to
kafka.metrics.MetricsTest
Repository: kafka
Updated Branches:
refs/heads/trunk c5aeaa7d8 -> 1af096039
MINOR: Various small improvements to kafka.metrics.MetricsTest
`testBrokerTopicMetricsUnregisteredAfterDeletingTopic` seemed completely broken,
as it was creating a topic but producing/consuming to another one.
Authored with mpburg
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3034 from mimaison/Fix-testBrokerTopicMetricsBytesInOut
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1af09603
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1af09603
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1af09603
Branch: refs/heads/trunk
Commit: 1af096039d7a2d8fd865b0eaee28c70f10e07008
Parents: c5aeaa7
Author: Mickael Maison <mi...@gmail.com>
Authored: Sat May 13 09:43:58 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat May 13 09:45:17 2017 +0100
----------------------------------------------------------------------
.../scala/unit/kafka/metrics/MetricsTest.scala | 27 ++++++++++----------
1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1af09603/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index d54e0b3..c586a54 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -41,7 +41,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val numParts = 2
val topic = "topic1"
- val overridingProps = new Properties()
+ val overridingProps = new Properties
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
def generateConfigs() =
@@ -55,16 +55,16 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
// create topic topic1 with 1 partition on broker 0
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
// force creation not client's specific metrics.
- createAndShutdownStep("group0", "consumer0", "producer0")
+ createAndShutdownStep(topic, "group0", "consumer0", "producer0")
//this assertion is only used for creating the metrics for DelayedFetchMetrics, it should never fail, but should not be removed
assertNotNull(DelayedFetchMetrics)
- val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
+ val countOfStaticMetrics = Metrics.defaultRegistry.allMetrics.keySet.size
for (i <- 0 to 5) {
- createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
- assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+ createAndShutdownStep(topic, "group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
+ assertEquals(countOfStaticMetrics, Metrics.defaultRegistry.allMetrics.keySet.size)
}
}
@@ -81,7 +81,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
val topic = "test-broker-topic-metric"
AdminUtils.createTopic(zkUtils, topic, 2, 1)
- createAndShutdownStep("group0", "consumer0", "producer0")
+ createAndShutdownStep(topic, "group0", "consumer0", "producer0")
+ assertTrue("Topic metrics don't exist", checkTopicMetricsExists(topic))
assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
AdminUtils.deleteTopic(zkUtils, topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -91,17 +92,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@Test
def testClusterIdMetric(): Unit = {
// Check if clusterId metric exists.
- val metrics = Metrics.defaultRegistry().allMetrics
+ val metrics = Metrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
}
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
- def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
+ def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
sendMessages(servers, topic, nMessages)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
- val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder, new StringDecoder)
getMessages(topicMessageStreams1, nMessages)
zkConsumerConnector1.shutdown()
@@ -114,7 +115,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic"
val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic"
- val topicConfig = new Properties()
+ val topicConfig = new Properties
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
createTopic(zkUtils, topic, 1, numNodes, servers, topicConfig)
// Produce a few messages to create the metrics
@@ -151,10 +152,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
}
private def checkTopicMetricsExists(topic: String): Boolean = {
- val topicMetricRegex = new Regex(".*("+topic+")$")
- val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
+ val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$")
+ val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).entrySet
for (metricGroup <- metricGroups.asScala) {
- if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
+ if (topicMetricRegex.pattern.matcher(metricGroup.getKey).matches)
return true
}
false