You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2018/10/01 03:18:00 UTC

[jira] [Resolved] (KAFKA-7459) Concurrency bug in updating RequestsPerSec metric

     [ https://issues.apache.org/jira/browse/KAFKA-7459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ismael Juma resolved KAFKA-7459.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 2.1.0
                   2.0.1

> Concurrency bug in updating RequestsPerSec metric 
> --------------------------------------------------
>
>                 Key: KAFKA-7459
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7459
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Critical
>             Fix For: 2.0.1, 2.1.0
>
>
> After KAFKA-6514, we add API version as a tag for the RequestsPerSec metric but in the implementation, we use the non-threadsafe mutable.HashMap to store the version -> metric mapping without any protection ([https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357|https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357).] ). This can mess up the data structure and cause unexpected behavior ([https://github.com/scala/bug/issues/10436|https://github.com/scala/bug/issues/10436).] ). We should use ConcurrentHashMap instead.
>  
> In our case, clean shutdown a 2.0 broker takes forever because of this concurrency bug leading to an infinite loop in HapMap resize.
>  Thread-1 is doing the clean shutdown but stuck on waiting for one of the network thread to shutdown: 
> {noformat}
> "Thread-1" #25 prio=5 os_prio=0 tid=0x00007f05c4010000 nid=0x79f4 waiting on condition [0x00007f0597cfb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000001ffad1500> (a java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>         at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:282)
>         at kafka.network.Processor.shutdown(SocketServer.scala:873)
>         at kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368)
>         at kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.network.Acceptor.shutdown(SocketServer.scala:368)
>         - locked <0x00000001fdcf1000> (a kafka.network.Acceptor)
>         at kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178)
>         at kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>         at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:178)
>         - locked <0x00000001fba0e610> (a kafka.network.SocketServer)
>         at kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:595)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:595){noformat}
> The network thread is always in HashTable.resize and never finishes updateRequestMetrics:
> {noformat}
> "kafka-network-thread-13673-ListenerName(SSL)-SSL-2" #201 prio=5 os_prio=0 tid=0x00007f441dae4000 nid=0x4cdc runnable [0x00007f2404189000]
>    java.lang.Thread.State: RUNNABLE
>         at scala.collection.mutable.HashTable$class.resize(HashTable.scala:268)
>         at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:157)
>         at scala.collection.mutable.HashTable$class.addEntry(HashTable.scala:148)
>         at scala.collection.mutable.HashMap.addEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.addEntry(HashMap.scala:93)
>         at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
>         at kafka.network.RequestMetrics.requestRate(RequestChannel.scala:438)
>         at kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:161)
>         at kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:159)
>         at scala.collection.immutable.List.foreach(List.scala:392)
>         at kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:159)
>         at kafka.network.Processor.kafka$network$Processor$$updateRequestMetrics(SocketServer.scala:740)
>         at kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:720)
>         at kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:715)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.network.Processor.processCompletedSends(SocketServer.scala:715)
>         at kafka.network.Processor.run(SocketServer.scala:585)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)