You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/08 21:50:52 UTC

[kafka] branch 3.3 updated: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side) (#12598)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new cfa1f098d6 KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side) (#12598)
cfa1f098d6 is described below

commit cfa1f098d6f0a47816d56e29d5dca01fb5cbf8a3
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Sep 8 23:35:59 2022 +0200

    KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side) (#12598)
    
    The consumer group instance ID is used to support a notion of "static" consumer groups. The idea is to be able to identify the same group instance across restarts so that a rebalance is not needed. However, if the user sets `group.instance.id` in the consumer configuration, but uses "simple" assignment with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID error.
    
    This PR attempts to fix this issue for existing consumers by relaxing the validation in this case. One way is to simply ignore the member id and the static id when the generation id is -1. -1 signals that the request comes from either the admin client or a consumer which does not use the group management. This does not apply to transactional offsets commit.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  5 +++++
 .../coordinator/group/GroupCoordinatorTest.scala   | 24 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6bf337d679..68dd8d4a06 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -982,6 +982,11 @@ class GroupCoordinator(val brokerId: Int,
   ): Option[Errors] = {
     if (group.is(Dead)) {
       Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (generationId < 0 && group.is(Empty)) {
+      // When the generation id is -1, the request comes from either the admin client
+      // or a consumer which does not use the group management facility. In this case,
+      // the request can commit offsets if the group is empty.
+      None
     } else if (generationId >= 0 || memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
       validateCurrentMember(
         group,
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 164c9ab1fe..a13793823c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1263,7 +1263,6 @@ class GroupCoordinatorTest {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(Empty)
 
-
     // Illegal state exception shall trigger since follower id resides in pending member bucket.
     val expectedException = assertThrows(classOf[IllegalStateException],
       () => staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1))
@@ -2941,6 +2940,29 @@ class GroupCoordinatorTest {
     assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
   }
 
+  @Test
+  def testManualCommitOffsetShouldNotValidateMemberIdAndInstanceId(): Unit = {
+    val tp = new TopicPartition("topic", 0)
+
+    var commitOffsetResult = commitOffsets(
+      groupId,
+      JoinGroupRequest.UNKNOWN_MEMBER_ID,
+      -1,
+      Map(tp -> offsetAndMetadata(0)),
+      Some("instance-id")
+    )
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    commitOffsetResult = commitOffsets(
+      groupId,
+      "unknown",
+      -1,
+      Map(tp -> offsetAndMetadata(0)),
+      None
+    )
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+  }
+
   @Test
   def testTxnCommitOffsetWithFencedInstanceId(): Unit = {
     val tp = new TopicPartition("topic", 0)