You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/06/01 14:45:53 UTC
kafka git commit: KAFKA-4956: Verify client-side throttle time
metrics in quota test
Repository: kafka
Updated Branches:
refs/heads/trunk e0150a25e -> 640082776
KAFKA-4956: Verify client-side throttle time metrics in quota test
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #3190 from rajinisivaram/KAFKA-4956-unittest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64008277
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64008277
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64008277
Branch: refs/heads/trunk
Commit: 640082776b429067613922c576a57ec716b1dbe9
Parents: e0150a2
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Jun 1 15:45:30 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Jun 1 15:45:30 2017 +0100
----------------------------------------------------------------------
.../integration/kafka/api/BaseQuotaTest.scala | 38 ++++++++++++++++----
1 file changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/64008277/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 918bb55..32f19e2 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -14,19 +14,17 @@
package kafka.api
-import java.util.{Collections, Properties}
+import java.util.{Collections, HashMap, Properties}
-import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer, QuotaId}
+import kafka.server.{ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaId, QuotaType}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
import org.junit.Assert._
import org.junit.{Before, Test}
-import kafka.server.QuotaType
-import org.apache.kafka.common.metrics.KafkaMetric
abstract class BaseQuotaTest extends IntegrationTestHarness {
@@ -83,12 +81,16 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
def testThrottledProducerConsumer() {
val numRecords = 1000
- val produced = produceUntilThrottled(producers.head, numRecords)
+ val producer = producers.head
+ val produced = produceUntilThrottled(producer, numRecords)
assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
+ verifyProducerThrottleTimeMetric(producer)
// Consumer should read in a bursty manner and get throttled immediately
- consumeUntilThrottled(consumers.head, produced)
+ val consumer = consumers.head
+ consumeUntilThrottled(consumer, produced)
assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
+ verifyConsumerThrottleTimeMetric(consumer)
}
@Test
@@ -152,6 +154,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
}
assertTrue("Should have been throttled", throttled)
+ verifyConsumerThrottleTimeMetric(consumer, Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0))
assertNotNull("Exempt requests not recorded", exemptRequestMetric)
assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
@@ -205,6 +208,27 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
}
}
+ private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
+ val tags = new HashMap[String, String]
+ tags.put("client-id", producerClientId)
+ val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags))
+ val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags))
+
+ TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0,
+ s"Producer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
+ }
+
+ private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) {
+ val tags = new HashMap[String, String]
+ tags.put("client-id", consumerClientId)
+ val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags))
+ val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags))
+
+ TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0,
+ s"Consumer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
+ maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${maxMetric.value}", maxMetric.value <= max))
+ }
+
private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
leaderNode.metrics.metricName("throttle-time",
quotaType.toString,