You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/01 17:39:58 UTC

[kafka] branch 2.5 updated: KAFKA-9659: Add more log4j when updating static member mappings (#8269)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 54eafcd  KAFKA-9659: Add more log4j when updating static member mappings (#8269)
54eafcd is described below

commit 54eafcd7bd91c33666d1d4f10ac652e372b10e93
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Mar 12 16:41:50 2020 -0700

    KAFKA-9659: Add more log4j when updating static member mappings (#8269)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Boyang Chen <bo...@confluent.io>, Rohan <de...@gmail.com>
---
 .../scala/kafka/coordinator/group/GroupCoordinator.scala   | 14 +++++++++-----
 .../main/scala/kafka/coordinator/group/GroupMetadata.scala |  4 +++-
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index f5d1ca9..bf8f991 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -203,8 +203,8 @@ class GroupCoordinator(val brokerId: Int,
 
         if (group.hasStaticMember(groupInstanceId)) {
           val oldMemberId = group.getStaticMemberId(groupInstanceId)
-          info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while " +
-            s"old member $oldMemberId will be removed.")
+          info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " +
+            s"old member id $oldMemberId will be removed.")
 
           val currentLeader = group.leaderOrNull
           val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
@@ -246,7 +246,7 @@ class GroupCoordinator(val brokerId: Int,
           addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
           responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
         } else {
-          debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
+          debug(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
             s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
           addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
             clientId, clientHost, protocolType, protocols, group, responseCallback)
@@ -280,6 +280,8 @@ class GroupCoordinator(val brokerId: Int,
           throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +
             s"into pending member bucket with member id $memberId")
         } else {
+          debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +
+            s"${group.currentState} state. Adding to the group now.")
           addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
             clientId, clientHost, protocolType, protocols, group, responseCallback)
         }
@@ -1017,10 +1019,12 @@ class GroupCoordinator(val brokerId: Int,
     // for new members. If the new member is still there, we expect it to retry.
     completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
 
-    if (member.isStaticMember)
+    if (member.isStaticMember) {
+      info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
       group.addStaticMember(groupInstanceId, memberId)
-    else
+    } else {
       group.removePendingMember(memberId)
+    }
     maybePrepareRebalance(group, s"Adding new member $memberId with group instanceid $groupInstanceId")
   }
 
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 9349f9e..dde76c3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -123,7 +123,7 @@ private[group] case object Empty extends GroupState {
 }
 
 
-private object GroupMetadata {
+private object GroupMetadata extends Logging {
 
   def loadGroup(groupId: String,
                 initialState: GroupState,
@@ -143,6 +143,8 @@ private object GroupMetadata {
     members.foreach(member => {
       group.add(member, null)
       if (member.isStaticMember) {
+        info(s"Static member $member.groupInstanceId of group $groupId loaded " +
+          s"with member id ${member.memberId} at generation ${group.generationId}.")
         group.addStaticMember(member.groupInstanceId, member.memberId)
       }
     })