You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/08 21:36:07 UTC

[kafka] branch trunk updated: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side) (#12598)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 706adc39f6 KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side) (#12598)
706adc39f6 is described below

commit 706adc39f6aff378f98000b0466181d7fcb1d997
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Sep 8 23:35:59 2022 +0200

    KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side) (#12598)
    
    The consumer group instance ID is used to support a notion of "static" consumer groups. The idea is to be able to identify the same group instance across restarts so that a rebalance is not needed. However, if the user sets `group.instance.id` in the consumer configuration, but uses "simple" assignment with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID error.
    
    This PR attempts to fix this issue for existing consumers by relaxing the validation in this case. One way is to simply ignore the member id and the static id when the generation id is -1. -1 signals that the request comes from either the admin client or a consumer which does not use the group management. This does not apply to transactional offsets commit.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  5 +++++
 .../coordinator/group/GroupCoordinatorTest.scala   | 24 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 8d8f86f774..de4dbf3b0a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -982,6 +982,11 @@ class GroupCoordinator(val brokerId: Int,
   ): Option[Errors] = {
     if (group.is(Dead)) {
       Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (generationId < 0 && group.is(Empty)) {
+      // When the generation id is -1, the request comes from either the admin client
+      // or a consumer which does not use the group management facility. In this case,
+      // the request can commit offsets if the group is empty.
+      None
     } else if (generationId >= 0 || memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
       validateCurrentMember(
         group,
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index dd3524c1e3..596cd0efba 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1263,7 +1263,6 @@ class GroupCoordinatorTest {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(Empty)
 
-
     // Illegal state exception shall trigger since follower id resides in pending member bucket.
     val expectedException = assertThrows(classOf[IllegalStateException],
       () => staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1))
@@ -2941,6 +2940,29 @@ class GroupCoordinatorTest {
     assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
   }
 
+  @Test
+  def testManualCommitOffsetShouldNotValidateMemberIdAndInstanceId(): Unit = {
+    val tp = new TopicPartition("topic", 0)
+
+    var commitOffsetResult = commitOffsets(
+      groupId,
+      JoinGroupRequest.UNKNOWN_MEMBER_ID,
+      -1,
+      Map(tp -> offsetAndMetadata(0)),
+      Some("instance-id")
+    )
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    commitOffsetResult = commitOffsets(
+      groupId,
+      "unknown",
+      -1,
+      Map(tp -> offsetAndMetadata(0)),
+      None
+    )
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+  }
+
   @Test
   def testTxnCommitOffsetWithFencedInstanceId(): Unit = {
     val tp = new TopicPartition("topic", 0)