You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/02/10 16:32:51 UTC

[kafka] branch 3.1 updated: KAFKA-13636: Fix for the group coordinator issue where the offsets are deleted for unstable groups (#11742)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new b294119  KAFKA-13636: Fix for the group coordinator issue where the offsets are deleted for unstable groups (#11742)
b294119 is described below

commit b294119baf951efd1e49c5e92d41ddd1485348df
Author: prince-mahajan <84...@users.noreply.github.com>
AuthorDate: Thu Feb 10 08:21:17 2022 -0800

    KAFKA-13636: Fix for the group coordinator issue where the offsets are deleted for unstable groups (#11742)
    
    This patch ensures that the committed offsets are not expired while the group is rebalancing. The issue is that we can't rely on the subscribed topics if the group is not stable.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../kafka/coordinator/group/GroupMetadata.scala    |  4 +-
 .../coordinator/group/GroupMetadataTest.scala      | 48 ++++++++++++++++++++--
 2 files changed, 47 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 5cb8a73..a7a6e91 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -774,8 +774,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
             .getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp)
         )
 
-      case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined =>
-        // consumers exist in the group =>
+      case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined && is(Stable) =>
+        // consumers exist in the group and group is stable =>
         // - if the group is aware of the subscribed topics and retention period had passed since the
         //   the last commit timestamp, expire the offset. offset with pending offset commit are not
         //   expired
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 275b7f6..eb0748d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Time, MockTime}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
 
@@ -260,6 +260,48 @@ class GroupMetadataTest {
   }
 
   @Test
+  def testOffsetRemovalDuringTransitionFromEmptyToNonEmpty(): Unit = {
+    val topic = "foo"
+    val partition = new TopicPartition(topic, 0)
+    val time = new MockTime()
+    group = new GroupMetadata("groupId", Empty, time)
+
+    // Rebalance once in order to commit offsets
+    val member = new MemberMetadata(memberId, None, clientId, clientHost, rebalanceTimeoutMs,
+      sessionTimeoutMs, protocolType, List(("range", ConsumerProtocol.serializeSubscription(new Subscription(List("foo").asJava)).array())))
+    group.transitionTo(PreparingRebalance)
+    group.add(member)
+    group.initNextGeneration()
+    assertEquals(Some(Set("foo")), group.getSubscribedTopics)
+
+    val offset = offsetAndMetadata(offset = 37, timestamp = time.milliseconds())
+    val commitRecordOffset = 3
+
+    group.prepareOffsetCommit(Map(partition -> offset))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
+
+    val offsetRetentionMs = 50000L
+    time.sleep(offsetRetentionMs + 1)
+
+    // Rebalance again so that the group becomes empty
+    group.transitionTo(PreparingRebalance)
+    group.remove(memberId)
+    group.initNextGeneration()
+
+    // The group is empty, but we should not expire the offset because the state was just changed
+    assertEquals(Empty, group.currentState)
+    assertEquals(Map.empty, group.removeExpiredOffsets(time.milliseconds(), offsetRetentionMs))
+
+    // Start a new rebalance to add the member back. The offset should not be expired
+    // while the rebalance is in progress.
+    group.transitionTo(PreparingRebalance)
+    group.add(member)
+    assertEquals(Map.empty, group.removeExpiredOffsets(time.milliseconds(), offsetRetentionMs))
+  }
+
+  @Test
   def testSubscribedTopics(): Unit = {
     // not able to compute it for a newly created group
     assertEquals(None, group.getSubscribedTopics)
@@ -718,8 +760,8 @@ class GroupMetadataTest {
     assertTrue(group.is(targetState))
   }
 
-  private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
-    OffsetAndMetadata(offset, "", Time.SYSTEM.milliseconds())
+  private def offsetAndMetadata(offset: Long, timestamp: Long = Time.SYSTEM.milliseconds()): OffsetAndMetadata = {
+    OffsetAndMetadata(offset, "", timestamp)
   }
 
 }