You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/23 08:42:43 UTC

[kafka] branch trunk updated: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c98c1ed41cf KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)
c98c1ed41cf is described below

commit c98c1ed41cfb2fdede4e6ec874d4c581f27b4508
Author: Jeff Kim <ki...@gmail.com>
AuthorDate: Tue May 23 04:42:13 2023 -0400

    KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)
    
    This path enables the new group metadata manager to generate GroupMetadataKey/Value records.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +
 .../kafka/coordinator/group/RecordHelpers.java     |  81 +++++++-
 .../kafka/coordinator/group/RecordHelpersTest.java | 217 +++++++++++++++++++++
 .../kafka/server/common/MetadataVersion.java       |  14 ++
 .../kafka/server/common/MetadataVersionTest.java   |  16 ++
 5 files changed, 329 insertions(+), 1 deletion(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2084490d187..c2fa6d4a47d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -324,6 +324,8 @@
               files="(ConsumerGroupMember).java"/>
     <suppress checks="ParameterNumber"
               files="(ConsumerGroupMember).java"/>
+    <suppress checks="ClassDataAbstractionCouplingCheck"
+              files="(RecordHelpersTest).java"/>
 
     <!-- storage -->
     <suppress checks="CyclomaticComplexity"
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
index 1e0a9eb9f14..bf6cd62a90f 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
@@ -31,7 +31,11 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -339,7 +343,7 @@ public class RecordHelpers {
      * Creates a ConsumerGroupCurrentMemberAssignment tombstone.
      *
      * @param groupId   The consumer group id.
-     * @param memberId    The consumer group member id.
+     * @param memberId  The consumer group member id.
      * @return The record.
      */
     public static Record newCurrentAssignmentTombstoneRecord(
@@ -357,6 +361,81 @@ public class RecordHelpers {
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        MetadataVersion metadataVersion
+    ) {
+        List<GroupMetadataValue.MemberMetadata> members = new ArrayList<>(group.allMembers().size());
+        group.allMembers().forEach(member -> {
+            byte[] subscription = group.protocolName().map(member::metadata).orElse(null);
+            if (subscription == null) {
+                throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol.");
+            }
+
+            byte[] assignment = member.assignment();
+            if (assignment == null) {
+                throw new IllegalStateException("Attempted to write member " + member.memberId() +
+                    " of group + " + group.groupId() + " with no assignment.");
+            }
+
+            members.add(
+                new GroupMetadataValue.MemberMetadata()
+                    .setMemberId(member.memberId())
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeout(member.rebalanceTimeoutMs())
+                    .setSessionTimeout(member.sessionTimeoutMs())
+                    .setGroupInstanceId(member.groupInstanceId().orElse(null))
+                    .setSubscription(subscription)
+                    .setAssignment(assignment)
+            );
+        });
+
+        return new Record(
+            new ApiMessageAndVersion(
+                new GroupMetadataKey()
+                    .setGroup(group.groupId()),
+                (short) 2
+            ),
+            new ApiMessageAndVersion(
+                new GroupMetadataValue()
+                    .setProtocol(group.protocolName().orElse(null))
+                    .setProtocolType(group.protocolType().orElse(""))
+                    .setGeneration(group.generationId())
+                    .setLeader(group.leaderOrNull())
+                    .setCurrentStateTimestamp(group.currentStateTimestampOrDefault())
+                    .setMembers(members),
+                metadataVersion.groupMetadataValueVersion()
+            )
+        );
+    }
+
+    /**
+     * Creates a GroupMetadata tombstone.
+     *
+     * @param groupId  The group id.
+     * @return The record.
+     */
+    public static Record newGroupMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new GroupMetadataKey()
+                    .setGroup(groupId),
+                (short) 2
+            ),
+            null // Tombstone
+        );
+    }
+
     private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
         Map<Uuid, Set<Integer>> topicPartitions
     ) {
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
index 6504b57b2e4..40b6ddaedcc 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.consumer.ClientAssignor;
 import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
 import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
@@ -33,16 +36,30 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.generic.Protocol;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
@@ -60,6 +77,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignme
 import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
 import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class RecordHelpersTest {
 
@@ -383,4 +401,203 @@ public class RecordHelpersTest {
             "member-id"
         ));
     }
+
+    private static Stream<Arguments> metadataToExpectedGroupMetadataValue() {
+        return Stream.of(
+            Arguments.arguments(MetadataVersion.IBP_0_10_0_IV0, (short) 0),
+            Arguments.arguments(MetadataVersion.IBP_1_1_IV0, (short) 1),
+            Arguments.arguments(MetadataVersion.IBP_2_2_IV0, (short) 2),
+            Arguments.arguments(MetadataVersion.IBP_3_5_IV0, (short) 3)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("metadataToExpectedGroupMetadataValue")
+    public void testNewGroupMetadataRecord(
+        MetadataVersion metadataVersion,
+        short expectedGroupMetadataValueVersion
+    ) {
+        Time time = new MockTime();
+
+        List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-1")
+                .setClientId("client-1")
+                .setClientHost("host-1")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-1")
+                .setSubscription(new byte[]{0, 1})
+                .setAssignment(new byte[]{1, 2})
+        );
+
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-2")
+                .setClientId("client-2")
+                .setClientHost("host-2")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-2")
+                .setSubscription(new byte[]{1, 2})
+                .setAssignment(new byte[]{2, 3})
+        );
+
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                new GroupMetadataKey()
+                    .setGroup("group-id"),
+                (short) 2),
+            new ApiMessageAndVersion(
+                new GroupMetadataValue()
+                    .setProtocol("range")
+                    .setProtocolType("consumer")
+                    .setLeader("member-1")
+                    .setGeneration(1)
+                    .setCurrentStateTimestamp(time.milliseconds())
+                    .setMembers(expectedMembers),
+                expectedGroupMetadataValueVersion));
+
+        GenericGroup group = new GenericGroup(
+            new LogContext(),
+            "group-id",
+            GenericGroupState.PREPARING_REBALANCE,
+            time
+        );
+
+        expectedMembers.forEach(member -> {
+            group.add(new GenericGroupMember(
+                member.memberId(),
+                Optional.of(member.groupInstanceId()),
+                member.clientId(),
+                member.clientHost(),
+                member.rebalanceTimeout(),
+                member.sessionTimeout(),
+                "consumer",
+                Collections.singletonList(new Protocol(
+                    "range",
+                    member.subscription()
+                )),
+                member.assignment()
+            ));
+        });
+
+        group.initNextGeneration();
+        Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
+            group,
+            metadataVersion
+        );
+
+        assertEquals(expectedRecord, groupMetadataRecord);
+    }
+
+    @Test
+    public void testNewGroupMetadataTombstoneRecord() {
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                new GroupMetadataKey()
+                    .setGroup("group-id"),
+                (short) 2),
+            null);
+
+        Record groupMetadataRecord = RecordHelpers.newGroupMetadataTombstoneRecord("group-id");
+        assertEquals(expectedRecord, groupMetadataRecord);
+    }
+
+    @Test
+    public void testNewGroupMetadataRecordThrowsWhenNullSubscription() {
+        Time time = new MockTime();
+
+        List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-1")
+                .setClientId("client-1")
+                .setClientHost("host-1")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-1")
+                .setSubscription(new byte[]{0, 1})
+                .setAssignment(new byte[]{1, 2})
+        );
+
+        GenericGroup group = new GenericGroup(
+            new LogContext(),
+            "group-id",
+            GenericGroupState.PREPARING_REBALANCE,
+            time
+        );
+
+        expectedMembers.forEach(member -> {
+            group.add(new GenericGroupMember(
+                member.memberId(),
+                Optional.of(member.groupInstanceId()),
+                member.clientId(),
+                member.clientHost(),
+                member.rebalanceTimeout(),
+                member.sessionTimeout(),
+                "consumer",
+                Collections.singletonList(new Protocol(
+                    "range",
+                    null
+                )),
+                member.assignment()
+            ));
+        });
+
+        assertThrows(IllegalStateException.class, () ->
+            RecordHelpers.newGroupMetadataRecord(
+                group,
+                MetadataVersion.IBP_3_5_IV2
+            ));
+    }
+
+    @Test
+    public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() {
+        Time time = new MockTime();
+
+        List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-1")
+                .setClientId("client-1")
+                .setClientHost("host-1")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-1")
+                .setSubscription(new byte[]{0, 1})
+                .setAssignment(null)
+        );
+
+        GenericGroup group = new GenericGroup(
+            new LogContext(),
+            "group-id",
+            GenericGroupState.PREPARING_REBALANCE,
+            time
+        );
+
+        expectedMembers.forEach(member ->
+            group.add(new GenericGroupMember(
+                member.memberId(),
+                Optional.of(member.groupInstanceId()),
+                member.clientId(),
+                member.clientHost(),
+                member.rebalanceTimeout(),
+                member.sessionTimeout(),
+                "consumer",
+                Collections.singletonList(new Protocol(
+                    "range",
+                    member.subscription()
+                )),
+                member.assignment()
+            ))
+        );
+
+        assertThrows(IllegalStateException.class, () ->
+            RecordHelpers.newGroupMetadataRecord(
+                group,
+                MetadataVersion.IBP_3_5_IV2
+            ));
+    }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index c095ec751f0..0a2c84ebe9d 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -365,6 +365,20 @@ public enum MetadataVersion {
         }
     }
 
+    public short groupMetadataValueVersion() {
+        if (this.isLessThan(IBP_0_10_1_IV0)) {
+            return 0;
+        } else if (this.isLessThan(IBP_2_1_IV0)) {
+            return 1;
+        } else if (this.isLessThan(IBP_2_3_IV0)) {
+            return 2;
+        } else {
+            // Serialize with the highest supported non-flexible version
+            // until a tagged field is introduced or the version is bumped.
+            return 3;
+        }
+    }
+
     private static final Map<String, MetadataVersion> IBP_VERSIONS;
     static {
         {
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 75bbad2aff5..cb8424e3f77 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -312,4 +312,20 @@ class MetadataVersionTest {
         }
         assertEquals(expectedVersion, metadataVersion.registerBrokerRecordVersion());
     }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void testGroupMetadataValueVersion(MetadataVersion metadataVersion) {
+        final short expectedVersion;
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_2_3_IV0)) {
+            expectedVersion = 3;
+        } else if (metadataVersion.isAtLeast(IBP_2_1_IV0)) {
+            expectedVersion = 2;
+        } else if (metadataVersion.isAtLeast(IBP_0_10_1_IV0)) {
+            expectedVersion = 1;
+        } else {
+            expectedVersion = 0;
+        }
+        assertEquals(expectedVersion, metadataVersion.groupMetadataValueVersion());
+    }
 }