You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/01 00:00:37 UTC

[kafka] branch 2.5 updated: MINOR: more logs for empty assignment (#8397)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 2190ae4  MINOR: more logs for empty assignment (#8397)
2190ae4 is described below

commit 2190ae4cf5f92c808f6962221a7026784bc5a239
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 31 16:59:25 2020 -0700

    MINOR: more logs for empty assignment (#8397)
    
    We find that brokers may send empty assignment for some members unexpectedly, and would need more logs investigating this issue.
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../kafka/clients/consumer/internals/AbstractCoordinator.java     | 1 +
 .../src/main/scala/kafka/coordinator/group/GroupCoordinator.scala | 8 ++++++++
 2 files changed, 9 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 50db862..a9d1fd6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -725,6 +725,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         syncResponse.data.protocolName(), generation().protocolName);
                     future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                 } else {
+                    log.debug("Received successful SyncGroup response: {}", syncResponse);
                     sensors.syncSensor.record(response.requestLatencyMs());
                     future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
                 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 441e708..f5d1ca9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -424,6 +424,10 @@ class GroupCoordinator(val brokerId: Int,
               val missing = group.allMembers -- groupAssignment.keySet
               val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
 
+              if (missing.nonEmpty) {
+                warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
+              }
+
               groupManager.storeGroup(group, assignment, (error: Errors) => {
                 group.inLock {
                   // another member may have joined the group while we were awaiting this callback,
@@ -934,6 +938,10 @@ class GroupCoordinator(val brokerId: Int,
     else
       (None, None)
     for (member <- group.allMemberMetadata) {
+      if (member.assignment.isEmpty && error == Errors.NONE) {
+        warn(s"Sending empty assignment to member ${member.memberId} of ${group.groupId} for generation ${group.generationId} with no errors")
+      }
+
       if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) {
         // reset the session timeout for members after propagating the member's assignment.
         // This is because if any member's session expired while we were still awaiting either