You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/08 22:12:16 UTC

[kafka] branch 3.3 updated: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 7131724819 KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599)
7131724819 is described below

commit 7131724819d35ee08ff84a4cb9b8ca88bacb1311
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Sep 9 00:05:40 2022 +0200

    KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599)
    
    The consumer group instance ID is used to support a notion of "static" consumer groups. The idea is to be able to identify the same group instance across restarts so that a rebalance is not needed. However, if the user sets `group.instance.id` in the consumer configuration, but uses "simple" assignment with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID error.
    
    This PR fixes the issue on the client side by not setting the group instance id if the member id is empty (no generation).
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    |  5 ++++-
 .../internals/ConsumerCoordinatorTest.java         | 26 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9838e7dc8f..5228c60e0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1272,8 +1272,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
 
         final Generation generation;
+        final String groupInstanceId;
         if (subscriptions.hasAutoAssignedPartitions()) {
             generation = generationIfStable();
+            groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
             // if the generation is null, we are not part of an active group (and we expect to be).
             // the only thing we can do is fail the commit and let the user rejoin the group in poll().
             if (generation == null) {
@@ -1293,6 +1295,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             }
         } else {
             generation = Generation.NO_GENERATION;
+            groupInstanceId = null;
         }
 
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
@@ -1300,7 +1303,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         .setGroupId(this.rebalanceConfig.groupId)
                         .setGenerationId(generation.generationId)
                         .setMemberId(generation.memberId)
-                        .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
+                        .setGroupInstanceId(groupInstanceId)
                         .setTopics(new ArrayList<>(requestTopicDataMap.values()))
         );
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index d948990d69..5e080b7721 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2821,6 +2821,32 @@ public abstract class ConsumerCoordinatorTest {
         assertEquals(newGen, coordinator.generation());
     }
 
+    @Test
+    public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
+        rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(
+            rebalanceConfig,
+            new Metrics(),
+            assignors,
+            false,
+            subscriptions
+        );
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(5000));
+
+        client.prepareResponse(body -> {
+            OffsetCommitRequestData data = ((OffsetCommitRequest) body).data();
+            return data.groupInstanceId() == null && data.memberId().isEmpty();
+        }, offsetCommitResponse(Collections.emptyMap()));
+
+        RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+            new OffsetAndMetadata(100L, "metadata")));
+
+        assertTrue(consumerClient.poll(future, time.timer(5000)));
+        assertFalse(future.failed());
+    }
+
     @Test
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed