You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/23 08:42:43 UTC
[kafka] branch trunk updated: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c98c1ed41cf KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)
c98c1ed41cf is described below
commit c98c1ed41cfb2fdede4e6ec874d4c581f27b4508
Author: Jeff Kim <ki...@gmail.com>
AuthorDate: Tue May 23 04:42:13 2023 -0400
KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)
This path enables the new group metadata manager to generate GroupMetadataKey/Value records.
Reviewers: David Jacot <dj...@confluent.io>
---
checkstyle/suppressions.xml | 2 +
.../kafka/coordinator/group/RecordHelpers.java | 81 +++++++-
.../kafka/coordinator/group/RecordHelpersTest.java | 217 +++++++++++++++++++++
.../kafka/server/common/MetadataVersion.java | 14 ++
.../kafka/server/common/MetadataVersionTest.java | 16 ++
5 files changed, 329 insertions(+), 1 deletion(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2084490d187..c2fa6d4a47d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -324,6 +324,8 @@
files="(ConsumerGroupMember).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember).java"/>
+ <suppress checks="ClassDataAbstractionCouplingCheck"
+ files="(RecordHelpersTest).java"/>
<!-- storage -->
<suppress checks="CyclomaticComplexity"
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
index 1e0a9eb9f14..bf6cd62a90f 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
@@ -31,7 +31,11 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
import java.util.List;
@@ -339,7 +343,7 @@ public class RecordHelpers {
* Creates a ConsumerGroupCurrentMemberAssignment tombstone.
*
* @param groupId The consumer group id.
- * @param memberId The consumer group member id.
+ * @param memberId The consumer group member id.
* @return The record.
*/
public static Record newCurrentAssignmentTombstoneRecord(
@@ -357,6 +361,81 @@ public class RecordHelpers {
);
}
+ /**
+ * Creates a GroupMetadata record.
+ *
+ * @param group The generic group.
+ * @param metadataVersion The metadata version.
+ * @return The record.
+ */
+ public static Record newGroupMetadataRecord(
+ GenericGroup group,
+ MetadataVersion metadataVersion
+ ) {
+ List<GroupMetadataValue.MemberMetadata> members = new ArrayList<>(group.allMembers().size());
+ group.allMembers().forEach(member -> {
+ byte[] subscription = group.protocolName().map(member::metadata).orElse(null);
+ if (subscription == null) {
+ throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol.");
+ }
+
+ byte[] assignment = member.assignment();
+ if (assignment == null) {
+ throw new IllegalStateException("Attempted to write member " + member.memberId() +
+ " of group + " + group.groupId() + " with no assignment.");
+ }
+
+ members.add(
+ new GroupMetadataValue.MemberMetadata()
+ .setMemberId(member.memberId())
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+ .setRebalanceTimeout(member.rebalanceTimeoutMs())
+ .setSessionTimeout(member.sessionTimeoutMs())
+ .setGroupInstanceId(member.groupInstanceId().orElse(null))
+ .setSubscription(subscription)
+ .setAssignment(assignment)
+ );
+ });
+
+ return new Record(
+ new ApiMessageAndVersion(
+ new GroupMetadataKey()
+ .setGroup(group.groupId()),
+ (short) 2
+ ),
+ new ApiMessageAndVersion(
+ new GroupMetadataValue()
+ .setProtocol(group.protocolName().orElse(null))
+ .setProtocolType(group.protocolType().orElse(""))
+ .setGeneration(group.generationId())
+ .setLeader(group.leaderOrNull())
+ .setCurrentStateTimestamp(group.currentStateTimestampOrDefault())
+ .setMembers(members),
+ metadataVersion.groupMetadataValueVersion()
+ )
+ );
+ }
+
+ /**
+ * Creates a GroupMetadata tombstone.
+ *
+ * @param groupId The group id.
+ * @return The record.
+ */
+ public static Record newGroupMetadataTombstoneRecord(
+ String groupId
+ ) {
+ return new Record(
+ new ApiMessageAndVersion(
+ new GroupMetadataKey()
+ .setGroup(groupId),
+ (short) 2
+ ),
+ null // Tombstone
+ );
+ }
+
private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
index 6504b57b2e4..40b6ddaedcc 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
@@ -17,6 +17,9 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.consumer.ClientAssignor;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
@@ -33,16 +36,30 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.generic.Protocol;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Stream;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
@@ -60,6 +77,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignme
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class RecordHelpersTest {
@@ -383,4 +401,203 @@ public class RecordHelpersTest {
"member-id"
));
}
+
+ private static Stream<Arguments> metadataToExpectedGroupMetadataValue() {
+ return Stream.of(
+ Arguments.arguments(MetadataVersion.IBP_0_10_0_IV0, (short) 0),
+ Arguments.arguments(MetadataVersion.IBP_1_1_IV0, (short) 1),
+ Arguments.arguments(MetadataVersion.IBP_2_2_IV0, (short) 2),
+ Arguments.arguments(MetadataVersion.IBP_3_5_IV0, (short) 3)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("metadataToExpectedGroupMetadataValue")
+ public void testNewGroupMetadataRecord(
+ MetadataVersion metadataVersion,
+ short expectedGroupMetadataValueVersion
+ ) {
+ Time time = new MockTime();
+
+ List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+ expectedMembers.add(
+ new GroupMetadataValue.MemberMetadata()
+ .setMemberId("member-1")
+ .setClientId("client-1")
+ .setClientHost("host-1")
+ .setRebalanceTimeout(1000)
+ .setSessionTimeout(1500)
+ .setGroupInstanceId("group-instance-1")
+ .setSubscription(new byte[]{0, 1})
+ .setAssignment(new byte[]{1, 2})
+ );
+
+ expectedMembers.add(
+ new GroupMetadataValue.MemberMetadata()
+ .setMemberId("member-2")
+ .setClientId("client-2")
+ .setClientHost("host-2")
+ .setRebalanceTimeout(1000)
+ .setSessionTimeout(1500)
+ .setGroupInstanceId("group-instance-2")
+ .setSubscription(new byte[]{1, 2})
+ .setAssignment(new byte[]{2, 3})
+ );
+
+ Record expectedRecord = new Record(
+ new ApiMessageAndVersion(
+ new GroupMetadataKey()
+ .setGroup("group-id"),
+ (short) 2),
+ new ApiMessageAndVersion(
+ new GroupMetadataValue()
+ .setProtocol("range")
+ .setProtocolType("consumer")
+ .setLeader("member-1")
+ .setGeneration(1)
+ .setCurrentStateTimestamp(time.milliseconds())
+ .setMembers(expectedMembers),
+ expectedGroupMetadataValueVersion));
+
+ GenericGroup group = new GenericGroup(
+ new LogContext(),
+ "group-id",
+ GenericGroupState.PREPARING_REBALANCE,
+ time
+ );
+
+ expectedMembers.forEach(member -> {
+ group.add(new GenericGroupMember(
+ member.memberId(),
+ Optional.of(member.groupInstanceId()),
+ member.clientId(),
+ member.clientHost(),
+ member.rebalanceTimeout(),
+ member.sessionTimeout(),
+ "consumer",
+ Collections.singletonList(new Protocol(
+ "range",
+ member.subscription()
+ )),
+ member.assignment()
+ ));
+ });
+
+ group.initNextGeneration();
+ Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
+ group,
+ metadataVersion
+ );
+
+ assertEquals(expectedRecord, groupMetadataRecord);
+ }
+
+ @Test
+ public void testNewGroupMetadataTombstoneRecord() {
+ Record expectedRecord = new Record(
+ new ApiMessageAndVersion(
+ new GroupMetadataKey()
+ .setGroup("group-id"),
+ (short) 2),
+ null);
+
+ Record groupMetadataRecord = RecordHelpers.newGroupMetadataTombstoneRecord("group-id");
+ assertEquals(expectedRecord, groupMetadataRecord);
+ }
+
+ @Test
+ public void testNewGroupMetadataRecordThrowsWhenNullSubscription() {
+ Time time = new MockTime();
+
+ List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+ expectedMembers.add(
+ new GroupMetadataValue.MemberMetadata()
+ .setMemberId("member-1")
+ .setClientId("client-1")
+ .setClientHost("host-1")
+ .setRebalanceTimeout(1000)
+ .setSessionTimeout(1500)
+ .setGroupInstanceId("group-instance-1")
+ .setSubscription(new byte[]{0, 1})
+ .setAssignment(new byte[]{1, 2})
+ );
+
+ GenericGroup group = new GenericGroup(
+ new LogContext(),
+ "group-id",
+ GenericGroupState.PREPARING_REBALANCE,
+ time
+ );
+
+ expectedMembers.forEach(member -> {
+ group.add(new GenericGroupMember(
+ member.memberId(),
+ Optional.of(member.groupInstanceId()),
+ member.clientId(),
+ member.clientHost(),
+ member.rebalanceTimeout(),
+ member.sessionTimeout(),
+ "consumer",
+ Collections.singletonList(new Protocol(
+ "range",
+ null
+ )),
+ member.assignment()
+ ));
+ });
+
+ assertThrows(IllegalStateException.class, () ->
+ RecordHelpers.newGroupMetadataRecord(
+ group,
+ MetadataVersion.IBP_3_5_IV2
+ ));
+ }
+
+ @Test
+ public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() {
+ Time time = new MockTime();
+
+ List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+ expectedMembers.add(
+ new GroupMetadataValue.MemberMetadata()
+ .setMemberId("member-1")
+ .setClientId("client-1")
+ .setClientHost("host-1")
+ .setRebalanceTimeout(1000)
+ .setSessionTimeout(1500)
+ .setGroupInstanceId("group-instance-1")
+ .setSubscription(new byte[]{0, 1})
+ .setAssignment(null)
+ );
+
+ GenericGroup group = new GenericGroup(
+ new LogContext(),
+ "group-id",
+ GenericGroupState.PREPARING_REBALANCE,
+ time
+ );
+
+ expectedMembers.forEach(member ->
+ group.add(new GenericGroupMember(
+ member.memberId(),
+ Optional.of(member.groupInstanceId()),
+ member.clientId(),
+ member.clientHost(),
+ member.rebalanceTimeout(),
+ member.sessionTimeout(),
+ "consumer",
+ Collections.singletonList(new Protocol(
+ "range",
+ member.subscription()
+ )),
+ member.assignment()
+ ))
+ );
+
+ assertThrows(IllegalStateException.class, () ->
+ RecordHelpers.newGroupMetadataRecord(
+ group,
+ MetadataVersion.IBP_3_5_IV2
+ ));
+ }
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index c095ec751f0..0a2c84ebe9d 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -365,6 +365,20 @@ public enum MetadataVersion {
}
}
+ public short groupMetadataValueVersion() {
+ if (this.isLessThan(IBP_0_10_1_IV0)) {
+ return 0;
+ } else if (this.isLessThan(IBP_2_1_IV0)) {
+ return 1;
+ } else if (this.isLessThan(IBP_2_3_IV0)) {
+ return 2;
+ } else {
+ // Serialize with the highest supported non-flexible version
+ // until a tagged field is introduced or the version is bumped.
+ return 3;
+ }
+ }
+
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 75bbad2aff5..cb8424e3f77 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -312,4 +312,20 @@ class MetadataVersionTest {
}
assertEquals(expectedVersion, metadataVersion.registerBrokerRecordVersion());
}
+
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class)
+ public void testGroupMetadataValueVersion(MetadataVersion metadataVersion) {
+ final short expectedVersion;
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_2_3_IV0)) {
+ expectedVersion = 3;
+ } else if (metadataVersion.isAtLeast(IBP_2_1_IV0)) {
+ expectedVersion = 2;
+ } else if (metadataVersion.isAtLeast(IBP_0_10_1_IV0)) {
+ expectedVersion = 1;
+ } else {
+ expectedVersion = 0;
+ }
+ assertEquals(expectedVersion, metadataVersion.groupMetadataValueVersion());
+ }
}