You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/01 00:00:37 UTC
[kafka] branch 2.5 updated: MINOR: more logs for empty assignment
(#8397)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 2190ae4 MINOR: more logs for empty assignment (#8397)
2190ae4 is described below
commit 2190ae4cf5f92c808f6962221a7026784bc5a239
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 31 16:59:25 2020 -0700
MINOR: more logs for empty assignment (#8397)
We find that brokers may send empty assignment for some members unexpectedly, and would need more logs investigating this issue.
Reviewers: John Roesler <vv...@apache.org>
---
.../kafka/clients/consumer/internals/AbstractCoordinator.java | 1 +
.../src/main/scala/kafka/coordinator/group/GroupCoordinator.scala | 8 ++++++++
2 files changed, 9 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 50db862..a9d1fd6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -725,6 +725,7 @@ public abstract class AbstractCoordinator implements Closeable {
syncResponse.data.protocolName(), generation().protocolName);
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
+ log.debug("Received successful SyncGroup response: {}", syncResponse);
sensors.syncSensor.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 441e708..f5d1ca9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -424,6 +424,10 @@ class GroupCoordinator(val brokerId: Int,
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
+ if (missing.nonEmpty) {
+ warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
+ }
+
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
// another member may have joined the group while we were awaiting this callback,
@@ -934,6 +938,10 @@ class GroupCoordinator(val brokerId: Int,
else
(None, None)
for (member <- group.allMemberMetadata) {
+ if (member.assignment.isEmpty && error == Errors.NONE) {
+ warn(s"Sending empty assignment to member ${member.memberId} of ${group.groupId} for generation ${group.generationId} with no errors")
+ }
+
if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) {
// reset the session timeout for members after propagating the member's assignment.
// This is because if any member's session expired while we were still awaiting either