You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/16 17:16:33 UTC

[kafka] branch trunk updated: KAFKA-6514; Add API version as a tag for the RequestsPerSec metric (#4506)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 19418fc  KAFKA-6514; Add API version as a tag for the RequestsPerSec metric (#4506)
19418fc is described below

commit 19418fc86aefde2acdc62f6790b2ff0bb4d15699
Author: Allen Wang <aw...@netflix.com>
AuthorDate: Mon Apr 16 10:16:26 2018 -0700

    KAFKA-6514; Add API version as a tag for the RequestsPerSec metric (#4506)
    
    Updated `RequestChannel` to include `version` as a tag for all RequestsPerSec metrics (KIP-272). Updated tests to verify that the extra tag exists.
---
 core/src/main/scala/kafka/network/RequestChannel.scala        | 11 ++++++++---
 core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 10 +++++++---
 docs/upgrade.html                                             |  6 ++++++
 3 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 8a17528..f03bdeb 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -158,7 +158,7 @@ object RequestChannel extends Logging {
       val metricNames = fetchMetricNames :+ header.apiKey.name
       metricNames.foreach { metricName =>
         val m = metrics(metricName)
-        m.requestRate.mark()
+        m.requestRate(header.apiVersion).mark()
         m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
         m.localTimeHist.update(Math.round(apiLocalTimeMs))
         m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
@@ -350,10 +350,11 @@ object RequestMetrics {
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
+
   import RequestMetrics._
 
   val tags = Map("request" -> name)
-  val requestRate = newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
+  val requestRateInternal = new mutable.HashMap[Short, Meter]
   // time a request spent in a request queue
   val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
   // time a request takes to be processed at the local broker
@@ -386,6 +387,10 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   private val errorMeters = mutable.Map[Errors, ErrorMeter]()
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
 
+  def requestRate(version: Short): Meter = {
+      requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+  }
+
   class ErrorMeter(name: String, error: Errors) {
     private val tags = Map("request" -> name, "error" -> error.name)
 
@@ -418,7 +423,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   }
 
   def removeMetrics(): Unit = {
-    removeMetric(RequestsPerSec, tags)
+    for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
     removeMetric(RequestQueueTimeMs, tags)
     removeMetric(LocalTimeMs, tags)
     removeMetric(RemoteTimeMs, tags)
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e6dadbb..7d3b428 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -688,10 +688,14 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testRequestMetricsAfterStop(): Unit = {
     server.stopProcessingRequests()
-
-    server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
+    val version = ApiKeys.PRODUCE.latestVersion
+    val version2 = (version - 1).toShort
+    for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
+    server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
+    assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
     server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
-    val nonZeroMeters = Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" -> 1,
+    val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2,
+        s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1,
         "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
 
     def requestMetricMeters = YammerMetrics
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 95f2c41..d369d1d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,6 +68,12 @@
 <ul>
     <li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a> increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for [...]
     <li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a> extends the lower interval of <code>max.connections.per.ip minimum</code> to zero and therefore allows IP-based filtering of inbound connections.</li>
+    <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-272%3A+Add+API+version+tag+to+broker%27s+RequestsPerSec+metric">KIP-272</a>
+        added API version tag to the metric <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}</code>.
+        This metric now becomes <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}</code>. This will impact
+        JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
+        updated to aggregate across different versions.
+    </li>
     <li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
 </ul>
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.