You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/01 02:14:00 UTC

[jira] [Commented] (KAFKA-7459) Concurrency bug in updating RequestsPerSec metric

    [ https://issues.apache.org/jira/browse/KAFKA-7459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633569#comment-16633569 ] 

ASF GitHub Bot commented on KAFKA-7459:
---------------------------------------

ijuma closed pull request #5717: KAFKA-7459: Use thread-safe Pool instead of non-thread-safe mutable.HashMap for requestRateInternal
URL: https://github.com/apache/kafka/pull/5717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index e5aa5d90029..00b09688c5b 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.{Logging, NotNothing}
+import kafka.utils.{Logging, NotNothing, Pool}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -387,7 +387,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   import RequestMetrics._
 
   val tags = Map("request" -> name)
-  val requestRateInternal = new mutable.HashMap[Short, Meter]
+  val requestRateInternal = new Pool[Short, Meter]()
   // time a request spent in a request queue
   val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
   // time a request takes to be processed at the local broker
@@ -421,7 +421,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
 
   def requestRate(version: Short): Meter = {
-      requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+    requestRateInternal.getAndMaybePut(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
   }
 
   class ErrorMeter(name: String, error: Errors) {
@@ -456,7 +456,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   }
 
   def removeMetrics(): Unit = {
-    for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
+    for (version <- requestRateInternal.keys) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
     removeMetric(RequestQueueTimeMs, tags)
     removeMetric(LocalTimeMs, tags)
     removeMetric(RemoteTimeMs, tags)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Concurrency bug in updating RequestsPerSec metric 
> --------------------------------------------------
>
>                 Key: KAFKA-7459
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7459
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Critical
>
> After KAFKA-6514, we add API version as a tag for the RequestsPerSec metric but in the implementation, we use the non-threadsafe mutable.HashMap to store the version -> metric mapping without any protection ([https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357|https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357).] ). This can mess up the data structure and cause unexpected behavior ([https://github.com/scala/bug/issues/10436|https://github.com/scala/bug/issues/10436).] ). We should use ConcurrentHashMap instead.
>  
> In our case, clean shutdown a 2.0 broker takes forever because of this concurrency bug leading to an infinite loop in HapMap resize.
>  Thread-1 is doing the clean shutdown but stuck on waiting for one of the network thread to shutdown: 
> {noformat}
> "Thread-1" #25 prio=5 os_prio=0 tid=0x00007f05c4010000 nid=0x79f4 waiting on condition [0x00007f0597cfb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000001ffad1500> (a java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>         at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:282)
>         at kafka.network.Processor.shutdown(SocketServer.scala:873)
>         at kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368)
>         at kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.network.Acceptor.shutdown(SocketServer.scala:368)
>         - locked <0x00000001fdcf1000> (a kafka.network.Acceptor)
>         at kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178)
>         at kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>         at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:178)
>         - locked <0x00000001fba0e610> (a kafka.network.SocketServer)
>         at kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:595)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:595){noformat}
> The network thread is always in HashTable.resize and never finishes updateRequestMetrics:
> {noformat}
> "kafka-network-thread-13673-ListenerName(SSL)-SSL-2" #201 prio=5 os_prio=0 tid=0x00007f441dae4000 nid=0x4cdc runnable [0x00007f2404189000]
>    java.lang.Thread.State: RUNNABLE
>         at scala.collection.mutable.HashTable$class.resize(HashTable.scala:268)
>         at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:157)
>         at scala.collection.mutable.HashTable$class.addEntry(HashTable.scala:148)
>         at scala.collection.mutable.HashMap.addEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.addEntry(HashMap.scala:93)
>         at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
>         at kafka.network.RequestMetrics.requestRate(RequestChannel.scala:438)
>         at kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:161)
>         at kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:159)
>         at scala.collection.immutable.List.foreach(List.scala:392)
>         at kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:159)
>         at kafka.network.Processor.kafka$network$Processor$$updateRequestMetrics(SocketServer.scala:740)
>         at kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:720)
>         at kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:715)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.network.Processor.processCompletedSends(SocketServer.scala:715)
>         at kafka.network.Processor.run(SocketServer.scala:585)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)