You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/03/09 13:57:12 UTC
[kafka] branch 2.8 updated: MINOR: Include number of members in
group coordinator messages (#10273)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new a243b28 MINOR: Include number of members in group coordinator messages (#10273)
a243b28 is described below
commit a243b28cc54156c908a8bd170ace596acb1d720b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue Mar 9 05:49:20 2021 -0800
MINOR: Include number of members in group coordinator messages (#10273)
Reviewers: Jason Gustafson <ja...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 2c5bf70..441b46d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -407,7 +407,8 @@ class GroupCoordinator(val brokerId: Int,
// if this is the leader, then we can attempt to persist state and transition to stable
if (group.isLeader(memberId)) {
- info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
+ info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}. " +
+ s"The group has ${group.size} members, ${group.allStaticMembers.size} of which are static.")
// fill any missing members with an empty assignment
val missing = group.allMembers.diff(groupAssignment.keySet)
@@ -1177,7 +1178,7 @@ class GroupCoordinator(val brokerId: Int,
group.inLock {
val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember)
if (notYetRejoinedDynamicMembers.nonEmpty) {
- info(s"Group ${group.groupId} remove dynamic members " +
+ info(s"Group ${group.groupId} removed dynamic members " +
s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}")
notYetRejoinedDynamicMembers.values foreach { failedMember =>
@@ -1212,7 +1213,7 @@ class GroupCoordinator(val brokerId: Int,
})
} else {
info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
- s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
+ s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) with ${group.size} members")
// trigger the awaiting join group response callback for all the members after rebalancing
for (member <- group.allMemberMetadata) {