You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2024/03/19 20:48:47 UTC

(kafka) branch trunk updated: KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c66d66dc67b KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)
c66d66dc67b is described below

commit c66d66dc67b3aacb60f438bb4c5c1c132e8be4f2
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Mar 19 20:48:41 2024 +0000

    KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)
    
    This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields.
    
    Reviewers: Lianet Magrans <li...@gmail.com>, Jeff Kim <je...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.java    |  11 +-
 .../group/GroupMetadataManagerTest.java            | 114 ++++++++++++++++++---
 2 files changed, 105 insertions(+), 20 deletions(-)

diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 48f0618c55d..0a789fa9630 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1227,10 +1227,13 @@ public class GroupMetadataManager {
             .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
 
         // The assignment is only provided in the following cases:
-        // 1. The member reported its owned partitions;
-        // 2. The member just joined or rejoined to group (epoch equals to zero);
-        // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) {
+        // 1. The member sent a full request. It does so when joining or rejoining the group with zero
+        //    as the member epoch; or on any errors (e.g. timeout). We use all the non-optional fields
+        //    (rebalanceTimeoutMs, subscribedTopicNames and ownedTopicPartitions) to detect a full request
+        //    as those must be set in a full request.
+        // 2. The member's assignment has been updated.
+        boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 && subscribedTopicNames != null && ownedTopicPartitions != null);
+        if (isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index e9304407cdd..43703059915 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -1650,6 +1650,102 @@ public class GroupMetadataManagerTest {
                 .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testConsumerGroupHeartbeatFullResponse() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with an empty consumer group.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addRacks()
+                .build())
+            .build();
+
+        // Prepare new assignment for the group.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)
+                    )));
+                }
+            }
+        ));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result;
+
+        // A full response should be sent back on joining.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Collections.singletonList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1))))),
+            result.response()
+        );
+
+        // Otherwise, a partial response should be sent back.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().memberEpoch()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000),
+            result.response()
+        );
+
+        // A full response should be sent back when the member sends
+        // a full request again.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().memberEpoch())
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Collections.singletonList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1))))),
+            result.response()
+        );
+    }
+
     @Test
     public void testReconciliationProcess() {
         String groupId = "fooup";
@@ -1904,16 +2000,7 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
                 .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000)
-                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setTopicPartitions(Arrays.asList(
-                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(0, 1)),
-                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(barTopicId)
-                            .setPartitions(Collections.singletonList(0))
-                    ))),
+                .setHeartbeatIntervalMs(5000),
             result.response()
         );
 
@@ -3057,12 +3144,7 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
                 .setMemberEpoch(2)
-                .setHeartbeatIntervalMs(5000)
-                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setTopicPartitions(Collections.singletonList(
-                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(0, 1))))),
+                .setHeartbeatIntervalMs(5000),
             result.response()
         );