You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/07/14 20:34:31 UTC

kafka git commit: MINOR: Change log level for group member failure from debug to info

Repository: kafka
Updated Branches:
  refs/heads/trunk 1685e7112 -> 873eeae9f


MINOR: Change log level for group member failure from debug to info

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3523 from guozhangwang/KMinor-group-coordinator-member-failure-info-log4j


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/873eeae9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/873eeae9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/873eeae9

Branch: refs/heads/trunk
Commit: 873eeae9f7ae661d27df5351c0001b785dc40956
Parents: 1685e71
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jul 14 13:32:32 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jul 14 13:32:32 2017 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/group/GroupCoordinator.scala      | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/873eeae9/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index d9d120c..42bc3c3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -320,7 +320,8 @@ class GroupCoordinator(val brokerId: Int,
             } else {
               val member = group.get(memberId)
               removeHeartbeatForLeavingMember(group, member)
-              onMemberFailure(group, member)
+              debug(s"Member ${member.memberId} in group ${group.groupId} has left, removing it from the group")
+              removeMemberAndUpdateGroup(group, member)
               responseCallback(Errors.NONE)
             }
           }
@@ -692,8 +693,7 @@ class GroupCoordinator(val brokerId: Int,
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
 
-  private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
-    debug(s"Member ${member.memberId} in group ${group.groupId} has failed")
+  private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) {
     group.remove(member.memberId)
     group.currentState match {
       case Dead | Empty =>
@@ -774,8 +774,10 @@ class GroupCoordinator(val brokerId: Int,
 
   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
     group synchronized {
-      if (!shouldKeepMemberAlive(member, heartbeatDeadline))
-        onMemberFailure(group, member)
+      if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
+        info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
+        removeMemberAndUpdateGroup(group, member)
+      }
     }
   }