You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/01 02:13:40 UTC

[kafka] branch trunk updated: KAFKA-7459: Use thread-safe Pool for RequestMetrics.requestRateInternal (#5717)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b35e971  KAFKA-7459: Use thread-safe Pool for RequestMetrics.requestRateInternal (#5717)
b35e971 is described below

commit b35e97125f291aa7dca22b8d897f9ab4b196fd37
Author: Zhanxiang (Patrick) Huang <hz...@hotmail.com>
AuthorDate: Sun Sep 30 19:13:28 2018 -0700

    KAFKA-7459: Use thread-safe Pool for RequestMetrics.requestRateInternal (#5717)
    
    As part of KAFKA-6514, the `apiVersion` tag was added to the `RequestsPerSec`
    metric. A thread unsafe `HashMap` was used in the implementation even though
    it can be accessed by multiple threads. Fix it by replacing it with the thread-safe
    `Pool`.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/network/RequestChannel.scala | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index e5aa5d9..00b0968 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.{Logging, NotNothing}
+import kafka.utils.{Logging, NotNothing, Pool}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -387,7 +387,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   import RequestMetrics._
 
   val tags = Map("request" -> name)
-  val requestRateInternal = new mutable.HashMap[Short, Meter]
+  val requestRateInternal = new Pool[Short, Meter]()
   // time a request spent in a request queue
   val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
   // time a request takes to be processed at the local broker
@@ -421,7 +421,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
 
   def requestRate(version: Short): Meter = {
-      requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+    requestRateInternal.getAndMaybePut(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
   }
 
   class ErrorMeter(name: String, error: Errors) {
@@ -456,7 +456,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   }
 
   def removeMetrics(): Unit = {
-    for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
+    for (version <- requestRateInternal.keys) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
     removeMetric(RequestQueueTimeMs, tags)
     removeMetric(LocalTimeMs, tags)
     removeMetric(RemoteTimeMs, tags)