You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/05/11 00:17:09 UTC

[GitHub] [kafka] jeffkbkim opened a new pull request, #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

jeffkbkim opened a new pull request, #13704:
URL: https://github.com/apache/kafka/pull/13704

   This PR enables the new group metadata manager to generate GroupMetadataKey/Value records.
   
   Built on top of https://github.com/apache/kafka/pull/13663. Files to review:
    * RecordHelpers.java
    * RecordHelpersTest.java
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191604696


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() {
         ));
     }
 
-    @Test
-    public void testNewGroupMetadataRecord() {
+    @ParameterizedTest
+    @MethodSource("metadataToExpectedGroupMetadataValue")

Review Comment:
   the reason I used method source is to include the corresponding expected version. 
   
   if we use EnumSource we can add if/else statements but i thought the existing approach would be cleaner. what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13704:
URL: https://github.com/apache/kafka/pull/13704#issuecomment-1544658481

   i'm seeing 
   ```
   Class Data Abstraction Coupling is 31 (max allowed is 25) classes [ApiMessageAndVersion, ClientAssignor, ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions, ConsumerGroupMember.Builder, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValue.Assignor, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValue.TopicMetadata, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValue.TopicPartition, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataValue, GenericGroup, GenericGroupMember, GroupMetadataKey, GroupMetadataValue, GroupMetadataValue.MemberMetadata, LinkedHashMap, LogContext, MockTime, Protocol, Record, TopicMetadata, VersionedMetadata]. [ClassData
 AbstractionCoupling]
   ```
   
   where this suggests from https://checkstyle.sourceforge.io/apidocs/com/puppycrawl/tools/checkstyle/checks/metrics/ClassDataAbstractionCouplingCheck.html#:~:text=Generally%20speaking%2C%20any%20data%20type,the%20structure%20of%20the%20class
   > Generally speaking, any data type with other data types as members or local variable that is an instantiation (object) of another class has data abstraction coupling (DAC). The higher the DAC, the more complex the structure of the class.
   
   Can we suppress this error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1192506160


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,

Review Comment:
   it is actually the other way around, the generic group is mutable so we could mutate it whenever we want here, i think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1192291029


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,

Review Comment:
   i think you're referring to https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L623
   
   the in-memory state is updated after we successfully append to the log which so we won't have this information when we generate the records to append, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1192324622


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,

Review Comment:
   yeah, that's right. the generic group is a bit weird from this regard because it does not use timeline data structures internally so we could just update it as well (like we do for subscriptions for instance). the part which is weird here is that we take everything from the group but the assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191395592


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,

Review Comment:
   hmm... don't we store it in MemberMetadata in the current implementation? We set it in `setAndPropagateAssignment`. I think that we need it because we need the ability to provide it the the member at any time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191390607


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,

Review Comment:
   the assignment comes directly from the sync group request, i don't think we need to store the assignment inside the group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191225920


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,

Review Comment:
   Any reason why we can't get the assignments from the group?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,
+        MetadataVersion metadataVersion
+    ) {
+        short version;
+        if (metadataVersion.isLessThan(IBP_0_10_1_IV0)) {
+            version = (short) 0;
+        } else if (metadataVersion.isLessThan(IBP_2_1_IV0)) {
+            version = (short) 1;
+        } else if (metadataVersion.isLessThan(IBP_2_3_IV0)) {
+            version = (short) 2;
+        } else {
+            version = (short) 3;

Review Comment:
   nit: Should we keep that comment?
   
   ```
         // Serialize with the highest supported non-flexible version
         // until a tagged field is introduced or the version is bumped.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -383,4 +398,84 @@ public void testNewCurrentAssignmentTombstoneRecord() {
             "member-id"
         ));
     }
+
+    @Test
+    public void testNewGroupMetadataRecord() {
+        Time time = new MockTime();
+
+        List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-1")
+                .setClientId("client-1")
+                .setClientHost("host-1")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-1")
+                .setSubscription(new byte[]{0, 1})
+                .setAssignment(new byte[]{1, 2})
+        );
+
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-2")
+                .setClientId("client-2")
+                .setClientHost("host-2")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-2")
+                .setSubscription(new byte[]{1, 2})
+                .setAssignment(new byte[]{2, 3})
+        );
+
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                new GroupMetadataKey()
+                    .setGroup("group-id"),
+                (short) 2),
+            new ApiMessageAndVersion(
+                new GroupMetadataValue()
+                    .setProtocol("range")
+                    .setProtocolType("consumer")
+                    .setLeader("member-1")
+                    .setGeneration(1)
+                    .setCurrentStateTimestamp(time.milliseconds())
+                    .setMembers(expectedMembers),
+            (short) 3));
+
+        GenericGroup group = new GenericGroup(
+            new LogContext(),
+            "group-id",
+            GenericGroupState.PREPARING_REBALANCE,
+            time
+        );
+
+        Map<String, byte[]> memberAssignments = new HashMap<>();
+        expectedMembers.forEach(member -> {
+            memberAssignments.put(member.memberId(), member.assignment());
+            group.add(new GenericGroupMember(
+                member.memberId(),
+                Optional.of(member.groupInstanceId()),
+                member.clientId(),
+                member.clientHost(),
+                member.rebalanceTimeout(),
+                member.sessionTimeout(),
+                "consumer",
+                Collections.singletonList(new Protocol(
+                    "range",
+                    member.subscription()
+                )),
+                member.assignment()
+            ));
+        });
+
+        group.initNextGeneration();
+        Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
+            group,
+            memberAssignments,
+            MetadataVersion.IBP_3_5_IV2
+        );
+
+        assertEquals(expectedRecord, groupMetadataRecord);
+    }
 }

Review Comment:
   Should we add the following test:
   * A unit test for the tombstone method;
   * A unit test which triggers the IllegalStateException exceptions;
   * Unit tests which verifies the various record versions? It could be a parameterised one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,
+        MetadataVersion metadataVersion
+    ) {
+        short version;
+        if (metadataVersion.isLessThan(IBP_0_10_1_IV0)) {
+            version = (short) 0;
+        } else if (metadataVersion.isLessThan(IBP_2_1_IV0)) {
+            version = (short) 1;
+        } else if (metadataVersion.isLessThan(IBP_2_3_IV0)) {
+            version = (short) 2;
+        } else {
+            version = (short) 3;
+        }

Review Comment:
   While we are here, let's push this logic into `MetadataVersion`. We did this for others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191438825


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -383,4 +398,84 @@ public void testNewCurrentAssignmentTombstoneRecord() {
             "member-id"
         ));
     }
+
+    @Test
+    public void testNewGroupMetadataRecord() {
+        Time time = new MockTime();
+
+        List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-1")
+                .setClientId("client-1")
+                .setClientHost("host-1")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-1")
+                .setSubscription(new byte[]{0, 1})
+                .setAssignment(new byte[]{1, 2})
+        );
+
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-2")
+                .setClientId("client-2")
+                .setClientHost("host-2")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-2")
+                .setSubscription(new byte[]{1, 2})
+                .setAssignment(new byte[]{2, 3})
+        );
+
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                new GroupMetadataKey()
+                    .setGroup("group-id"),
+                (short) 2),
+            new ApiMessageAndVersion(
+                new GroupMetadataValue()
+                    .setProtocol("range")
+                    .setProtocolType("consumer")
+                    .setLeader("member-1")
+                    .setGeneration(1)
+                    .setCurrentStateTimestamp(time.milliseconds())
+                    .setMembers(expectedMembers),
+            (short) 3));
+
+        GenericGroup group = new GenericGroup(
+            new LogContext(),
+            "group-id",
+            GenericGroupState.PREPARING_REBALANCE,
+            time
+        );
+
+        Map<String, byte[]> memberAssignments = new HashMap<>();
+        expectedMembers.forEach(member -> {
+            memberAssignments.put(member.memberId(), member.assignment());
+            group.add(new GenericGroupMember(
+                member.memberId(),
+                Optional.of(member.groupInstanceId()),
+                member.clientId(),
+                member.clientHost(),
+                member.rebalanceTimeout(),
+                member.sessionTimeout(),
+                "consumer",
+                Collections.singletonList(new Protocol(
+                    "range",
+                    member.subscription()
+                )),
+                member.assignment()
+            ));
+        });
+
+        group.initNextGeneration();
+        Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
+            group,
+            memberAssignments,
+            MetadataVersion.IBP_3_5_IV2
+        );
+
+        assertEquals(expectedRecord, groupMetadataRecord);
+    }
 }

Review Comment:
   i updated `testNewGroupMetadataRecord` for 3). can you take a look?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191452753


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() {
         ));
     }
 
-    @Test
-    public void testNewGroupMetadataRecord() {
+    @ParameterizedTest
+    @MethodSource("metadataToExpectedGroupMetadataValue")

Review Comment:
   You could use: 
   ```
       @ParameterizedTest
       @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV1", "IBP_3_3_IV2"})
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13704:
URL: https://github.com/apache/kafka/pull/13704


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13704:
URL: https://github.com/apache/kafka/pull/13704#issuecomment-1545714546

   > i'm seeing
   > 
   > ```
   > Class Data Abstraction Coupling is 31 (max allowed is 25) classes [ApiMessageAndVersion, ClientAssignor, ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions, ConsumerGroupMember.Builder, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValue.Assignor, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValue.TopicMetadata, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValue.TopicPartition, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataValue, GenericGroup, GenericGroupMember, GroupMetadataKey, GroupMetadataValue, GroupMetadataValue.MemberMetadata, LinkedHashMap, LogContext, MockTime, Protocol, Record, TopicMetadata, VersionedMetadata]. [ClassDa
 taAbstractionCoupling]
   > ```
   > 
   > where this suggests from https://checkstyle.sourceforge.io/apidocs/com/puppycrawl/tools/checkstyle/checks/metrics/ClassDataAbstractionCouplingCheck.html#:~:text=Generally%20speaking%2C%20any%20data%20type,the%20structure%20of%20the%20class
   > 
   > > Generally speaking, any data type with other data types as members or local variable that is an instantiation (object) of another class has data abstraction coupling (DAC). The higher the DAC, the more complex the structure of the class.
   > 
   > Can we suppress this error?
   
   It is the first time I see this. I see that we can suppress it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191453623


##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -365,6 +365,20 @@ public short listOffsetRequestVersion() {
         }
     }
 
+    public short groupMetadataValueVersion() {

Review Comment:
   nit: Should we add a unit test in MetadataVersionTest as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1200514722


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -383,4 +401,203 @@ public void testNewCurrentAssignmentTombstoneRecord() {
             "member-id"
         ));
     }
+
+    private static Stream<Arguments> metadataToExpectedGroupMetadataValue() {
+        return Stream.of(
+            Arguments.arguments(MetadataVersion.IBP_0_10_0_IV0, (short) 0),
+            Arguments.arguments(MetadataVersion.IBP_1_1_IV0, (short) 1),
+            Arguments.arguments(MetadataVersion.IBP_2_2_IV0, (short) 2),
+            Arguments.arguments(MetadataVersion.IBP_3_5_IV0, (short) 3)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("metadataToExpectedGroupMetadataValue")
+    public void testNewGroupMetadataRecord(
+        MetadataVersion metadataVersion,
+        short expectedGroupMetadataValueVersion
+    ) {
+        Time time = new MockTime();
+
+        List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-1")
+                .setClientId("client-1")
+                .setClientHost("host-1")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-1")
+                .setSubscription(new byte[]{0, 1})
+                .setAssignment(new byte[]{1, 2})
+        );
+
+        expectedMembers.add(
+            new GroupMetadataValue.MemberMetadata()
+                .setMemberId("member-2")
+                .setClientId("client-2")
+                .setClientHost("host-2")
+                .setRebalanceTimeout(1000)
+                .setSessionTimeout(1500)
+                .setGroupInstanceId("group-instance-2")
+                .setSubscription(new byte[]{1, 2})
+                .setAssignment(new byte[]{2, 3})
+        );
+
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                new GroupMetadataKey()
+                    .setGroup("group-id"),
+                (short) 2),
+            new ApiMessageAndVersion(
+                new GroupMetadataValue()
+                    .setProtocol("range")
+                    .setProtocolType("consumer")
+                    .setLeader("member-1")
+                    .setGeneration(1)
+                    .setCurrentStateTimestamp(time.milliseconds())
+                    .setMembers(expectedMembers),
+            expectedGroupMetadataValueVersion));

Review Comment:
   nit: Indentation seems to be off here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +361,81 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        MetadataVersion metadataVersion
+    ) {
+        List<GroupMetadataValue.MemberMetadata> members = new ArrayList<>();

Review Comment:
   nit: Should we set the size based on the number of members in the group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191653253


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() {
         ));
     }
 
-    @Test
-    public void testNewGroupMetadataRecord() {
+    @ParameterizedTest
+    @MethodSource("metadataToExpectedGroupMetadataValue")

Review Comment:
   Ah. I see. I missed the version part. We can keep it as it is but could we move the method to just before this test? It is easier to read them together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13704:
URL: https://github.com/apache/kafka/pull/13704#issuecomment-1547931013

   @dajac this is ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1192476699


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a GroupMetadata record.
+     *
+     * @param group              The generic group.
+     * @param memberAssignments  The assignment by member id.
+     * @param metadataVersion    The metadata version.
+     * @return The record.
+     */
+    public static Record newGroupMetadataRecord(
+        GenericGroup group,
+        Map<String, byte[]> memberAssignments,

Review Comment:
   i think you're saying that ideally we want to create a record solely from the group i.e. update the group with the assignment and pass it into this method. However, we can't because we don't use timeline data structures that could revert if append fails. is my understanding correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org