You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/01 17:39:58 UTC
[kafka] branch 2.5 updated: KAFKA-9659: Add more log4j when
updating static member mappings (#8269)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 54eafcd KAFKA-9659: Add more log4j when updating static member mappings (#8269)
54eafcd is described below
commit 54eafcd7bd91c33666d1d4f10ac652e372b10e93
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Mar 12 16:41:50 2020 -0700
KAFKA-9659: Add more log4j when updating static member mappings (#8269)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Boyang Chen <bo...@confluent.io>, Rohan <de...@gmail.com>
---
.../scala/kafka/coordinator/group/GroupCoordinator.scala | 14 +++++++++-----
.../main/scala/kafka/coordinator/group/GroupMetadata.scala | 4 +++-
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index f5d1ca9..bf8f991 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -203,8 +203,8 @@ class GroupCoordinator(val brokerId: Int,
if (group.hasStaticMember(groupInstanceId)) {
val oldMemberId = group.getStaticMemberId(groupInstanceId)
- info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while " +
- s"old member $oldMemberId will be removed.")
+ info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " +
+ s"old member id $oldMemberId will be removed.")
val currentLeader = group.leaderOrNull
val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
@@ -246,7 +246,7 @@ class GroupCoordinator(val brokerId: Int,
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
- debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
+ debug(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
@@ -280,6 +280,8 @@ class GroupCoordinator(val brokerId: Int,
throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +
s"into pending member bucket with member id $memberId")
} else {
+ debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +
+ s"${group.currentState} state. Adding to the group now.")
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
}
@@ -1017,10 +1019,12 @@ class GroupCoordinator(val brokerId: Int,
// for new members. If the new member is still there, we expect it to retry.
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
- if (member.isStaticMember)
+ if (member.isStaticMember) {
+ info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
group.addStaticMember(groupInstanceId, memberId)
- else
+ } else {
group.removePendingMember(memberId)
+ }
maybePrepareRebalance(group, s"Adding new member $memberId with group instanceid $groupInstanceId")
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 9349f9e..dde76c3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -123,7 +123,7 @@ private[group] case object Empty extends GroupState {
}
-private object GroupMetadata {
+private object GroupMetadata extends Logging {
def loadGroup(groupId: String,
initialState: GroupState,
@@ -143,6 +143,8 @@ private object GroupMetadata {
members.foreach(member => {
group.add(member, null)
if (member.isStaticMember) {
+ info(s"Static member $member.groupInstanceId of group $groupId loaded " +
+ s"with member id ${member.memberId} at generation ${group.generationId}.")
group.addStaticMember(member.groupInstanceId, member.memberId)
}
})