You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by lu...@apache.org on 2024/02/13 18:08:27 UTC

(kafka) branch trunk updated: MINOR: ignore heartbeat response if leaving (#15362)

This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c0488b887b MINOR: ignore heartbeat response if leaving (#15362)
8c0488b887b is described below

commit 8c0488b887be4a9178563f1d72514010f83b8614
Author: Lucas Brutschy <lb...@confluent.io>
AuthorDate: Tue Feb 13 19:08:13 2024 +0100

    MINOR: ignore heartbeat response if leaving (#15362)
    
    When the consumer enters state LEAVING, it sets the epoch to the leave epoch,
    such as -1. When the timing is right, we may get a heartbeat response after
    entering the state LEAVING, which resets the epoch to the member epoch on
    the server. The result is that the consumer never leaves the group.
    
    Seems like c6f4c604d8e50ad9e182eeb66f0d1650aa44f277 changed the timing inside
    the consumer to relatively frequently triggers this problem inside
    `DescribeConsumerGroupTest`.
    
    We fix it by ignoring any heartbeat responses when we are in state LEAVING.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../consumer/internals/MembershipManagerImpl.java        |  5 +++++
 .../consumer/internals/MembershipManagerImplTest.java    | 16 ++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index a5a28da049e..7132b43a869 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -352,6 +352,11 @@ public class MembershipManagerImpl implements MembershipManager {
             );
             throw new IllegalArgumentException(errorMessage);
         }
+        if (state == MemberState.LEAVING) {
+            log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} is " +
+                    "already leaving the group.", memberId, memberEpoch);
+            return;
+        }
 
         // Update the group member id label in the client telemetry reporter if the member id has
         // changed. Initially the member id is empty, and it is updated when the member joins the
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index bb441fe7e64..50fede2e9ef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -697,6 +697,22 @@ public class MembershipManagerImplTest {
         testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
         verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
     }
+    @Test
+    public void testIgnoreHeartbeatWhenLeavingGroup() {
+        MembershipManager membershipManager = createMemberInStableState();
+        mockLeaveGroup();
+
+        CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
+
+        membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
+
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        assertEquals(-1, membershipManager.memberEpoch());
+        assertEquals(MEMBER_ID, membershipManager.memberId());
+        assertTrue(membershipManager.currentAssignment().isEmpty());
+        assertFalse(leaveResult.isDone(), "Leave group result should not complete until the " +
+            "heartbeat request to leave is sent out.");
+    }
 
     @Test
     public void testLeaveGroupWhenStateIsReconciling() {