You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ying Zheng (JIRA)" <ji...@apache.org> on 2018/07/09 23:13:00 UTC

[jira] [Assigned] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

     [ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ying Zheng reassigned KAFKA-7142:
---------------------------------

    Assignee: Ying Zheng

> Rebalancing large consumer group can block the coordinator broker for several seconds
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7142
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7142
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.10.2.0
>            Reporter: Ying Zheng
>            Assignee: Ying Zheng
>            Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few thousand members) is rebalancing, the produce latency of the coordinator broker can jump to several seconds.
>  
> When this happens, jstack shows all the request handler threads of the broker are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x00007f9a32b16000 nid=0x1b985 waiting on condition [0x00007f98f1adb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x000000024aa73b20> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>         at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>         at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
>         at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
>         at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x00007f9a32b14000 nid=0x1b984 runnable [0x00007f98f1bdc000]
>    java.lang.Thread.State: RUNNABLE
>         at scala.collection.immutable.List.map(List.scala:284)
>         at kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
>         at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
>         at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
>         at scala.collection.immutable.List.map(List.scala:288)
>         at kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
>         at kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
>         at kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
>         at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
>         at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x00007fe9f6ad1000 nid=0x1ceff runnable [0x00007fe8701ca000]
>    java.lang.Thread.State: RUNNABLE
>         at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
>         at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
>         at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
>         at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
>         at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
>         at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
>         at kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229)
>         at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767)
>         at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
>         at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
>         at kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:766)
>         at kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:38)
>         at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:396)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:298)
>         at kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:233)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
>         at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
>         at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
>         at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:745){noformat}
>  
> Both of GroupMetadata.supportsProtocols and GroupCoordinator.tryCompleteJoin are O(N) operations. This makes the group rebalancing to be an O(N^2) operation. In spite of how many brokers are there in the cluster and how many cores are there in the broker, those consumer group operations can only be processed by a single thread.
> My trace log messages show that each GroupMetadata.supportsProtocols() call on a 3000 member group takes 30ms in average.
> Both of the 2 operations can be done in O(1) time, with some data structures tracing the supported protocols and # of "not yet joined" members when adding / removing / updating members.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)