You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ying Zheng (JIRA)" <ji...@apache.org> on 2018/07/09 23:13:00 UTC
[jira] [Assigned] (KAFKA-7142) Rebalancing large consumer group can
block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ying Zheng reassigned KAFKA-7142:
---------------------------------
Assignee: Ying Zheng
> Rebalancing large consumer group can block the coordinator broker for several seconds
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
> Issue Type: Improvement
> Affects Versions: 0.10.2.0
> Reporter: Ying Zheng
> Assignee: Ying Zheng
> Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few thousand members) is rebalancing, the produce latency of the coordinator broker can jump to several seconds.
>
> When this happens, jstack shows all the request handler threads of the broker are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x00007f9a32b16000 nid=0x1b985 waiting on condition [0x00007f98f1adb000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000024aa73b20> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x00007f9a32b14000 nid=0x1b984 runnable [0x00007f98f1bdc000]
> java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x00007fe9f6ad1000 nid=0x1ceff runnable [0x00007fe8701ca000]
> java.lang.Thread.State: RUNNABLE
> at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
> at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> at kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229)
> at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767)
> at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
> at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
> at kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:766)
> at kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:38)
> at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
> at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:396)
> at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:298)
> at kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:233)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
> at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>
> Both of GroupMetadata.supportsProtocols and GroupCoordinator.tryCompleteJoin are O(N) operations. This makes the group rebalancing to be an O(N^2) operation. In spite of how many brokers are there in the cluster and how many cores are there in the broker, those consumer group operations can only be processed by a single thread.
> My trace log messages show that each GroupMetadata.supportsProtocols() call on a 3000 member group takes 30ms in average.
> Both of the 2 operations can be done in O(1) time, with some data structures tracing the supported protocols and # of "not yet joined" members when adding / removing / updating members.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)