You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/04/07 16:44:31 UTC

[GitHub] [kafka] jeffkbkim opened a new pull request, #13526: KAFKA-14869: bump coordinator Value records to flexible versions

jeffkbkim opened a new pull request, #13526:
URL: https://github.com/apache/kafka/pull/13526

   KIP-915: Part 2 of 2
   
   Bump all Value records in __transaction_state and __consumer_offsets topics to flexible versions. Serialize value records to highest known non-flexible version to allow downgrades to lower versions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1166578031


##########
build.gradle:
##########
@@ -958,12 +958,15 @@ project(':core') {
     classpath = configurations.generator
     args = [ "-p", "kafka.internals.generated",
              "-o", "src/generated/java/kafka/internals/generated",
-             "-i", "src/main/resources/common/message",
+             "-i", "src/main/resources/common/message,src/test/resources/message",

Review Comment:
   This is not correct because the test schemas will end up in the main source tree. Take a look at how we did in the `clients` module (e.g. `processTestMessages`).



##########
core/src/main/resources/common/message/TransactionLogValue.json:
##########
@@ -13,27 +13,31 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 1 is the first flexible version.

Review Comment:
   nit: Let put this before `validVersions` as well.



##########
core/src/test/resources/message/GroupMetadataValueWithUnknownTaggedFields.json:
##########
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Version 4 is the first flexible version.
+// ONLY USED FOR TESTING

Review Comment:
   Let's clearly state that this is a copy of GroupMetadataValue with additional tagged fields used to test the downgrade mechanism introduced in KIP-xxx. 



##########
core/src/test/resources/message/OffsetCommitValueWithUnknownTaggedFields.json:
##########
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Version 4 is the first flexible version.
+// ONLY USED FOR TESTING

Review Comment:
   ditto.



##########
core/src/test/resources/message/TransactionLogValueWithUnknownTaggedFields.json:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Version 1 is the first flexible version.

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value().getShort)

Review Comment:
   nit: `value` without `()`.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2616,7 +2675,8 @@ class GroupMetadataManagerTest {
                                                memberId: String,
                                                assignmentBytes: Array[Byte] = Array.emptyByteArray,
                                                metadataVersion: MetadataVersion = MetadataVersion.latest): SimpleRecord = {
-    val memberProtocols = List((protocol, Array.emptyByteArray))
+    val subscription = new Subscription(List("topic").asJava)
+    val memberProtocols = List((protocol, ConsumerProtocol.serializeSubscription(subscription).array()))

Review Comment:
   Why do we need this change? It is not clear to me.



##########
generator/src/main/java/org/apache/kafka/message/MessageGenerator.java:
##########
@@ -217,7 +217,7 @@ private static List<MessageClassGenerator> createMessageClassGenerators(String p
 
     public static void processDirectories(String packageName,
                                           String outputDir,
-                                          String inputDir,
+                                          String inputDirs,

Review Comment:
   As explain earlier, we don't need this. Let's revert those changes.



##########
group-coordinator/src/main/resources/common/message/GroupMetadataValue.json:
##########
@@ -13,11 +13,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 4 is the first flexible version.
 {
   "type": "data",
   "name": "GroupMetadataValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-4",

Review Comment:
   I just noticed something weird here. At L35, the version is `0-3`. I think that this should be `0+`. It does not seem to have any effect though.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,28 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+

Review Comment:
   Should we also add a test with TransactionLogValue v1?



##########
core/src/main/resources/common/message/TransactionLogValue.json:
##########
@@ -13,27 +13,31 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 1 is the first flexible version.
+// ONLY USED FOR TESTING

Review Comment:
   Is this correct? It seems not...



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)

Review Comment:
   nit: `value` without `()`.



##########
group-coordinator/src/main/resources/common/message/OffsetCommitValue.json:
##########
@@ -13,11 +13,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 4 is the first flexible version.

Review Comment:
   nit: Let's move this line to right before `validVersions` as well.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value().getShort)
+    }
+  }
+

Review Comment:
   Should we also add two tests with OffsetCommitValue v4 and GroupMetadataValue v4?



##########
group-coordinator/src/main/resources/common/message/GroupMetadataValue.json:
##########
@@ -13,11 +13,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 4 is the first flexible version.

Review Comment:
   nit: Let's move this line to right before `validVersions` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13526:
URL: https://github.com/apache/kafka/pull/13526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13526:
URL: https://github.com/apache/kafka/pull/13526#issuecomment-1509245732

   @dajac 
   
   > I recall that we discussed adding a mechanism to prevent adding non-tagged fields in the future. Have you looked into this?
   
   @hachikuji pointed out that we may want to introduce non tagged fields in the future which I reflected in the KIP. We will not be enforcing this and only inform the developer that it will break backward compatibility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1163741549


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,34 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test

Review Comment:
   Should we add tests which verify that the coordinators can deserialize the newest versions? I also wonder if we could craft a newer version with tagged fields somehow. I am not sure if this is even possible. 



##########
core/src/main/resources/common/message/TransactionLogValue.json:
##########
@@ -13,27 +13,29 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 1 is the first flexible version.
+// KIP-915: bumping the version will no longer make this record backward compatible.

Review Comment:
   nit: I would be better to move those to right before `validVersions`.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1115,7 +1117,9 @@ object GroupMetadataManager {
       if (metadataVersion.isLessThan(IBP_0_10_1_IV0)) 0.toShort
       else if (metadataVersion.isLessThan(IBP_2_1_IV0)) 1.toShort
       else if (metadataVersion.isLessThan(IBP_2_3_IV0)) 2.toShort
-      else 3.toShort
+      // Serialize with the highest supported non-flexible version
+      // until a tagged field is introduced or the version is bumped.
+      else GroupMetadataValue.HIGHEST_SUPPORTED_NON_FLEXIBLE_VERSION

Review Comment:
   ditto.



##########
group-coordinator/src/main/resources/common/message/GroupMetadataValue.json:
##########
@@ -13,11 +13,13 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 4 is the first flexible version.
+// KIP-915: bumping the version will no longer make this record backward compatible.

Review Comment:
   nit: Let's move those as well.



##########
group-coordinator/src/main/resources/common/message/OffsetCommitValue.json:
##########
@@ -13,11 +13,13 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 4 is the first flexible version.
+// KIP-915: bumping the version will no longer make this record backward compatible.

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -82,7 +82,9 @@ object TransactionLog {
             .setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava)
         }.toList.asJava
 
-    MessageUtil.toVersionPrefixedBytes(TransactionLogValue.HIGHEST_SUPPORTED_VERSION,
+    // Serialize with the highest supported non-flexible version
+    // until a tagged field is introduced or the version is bumped.
+    MessageUtil.toVersionPrefixedBytes(TransactionLogValue.HIGHEST_SUPPORTED_NON_FLEXIBLE_VERSION,

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1087,7 +1087,9 @@ object GroupMetadataManager {
     val version =
       if (metadataVersion.isLessThan(IBP_2_1_IV0) || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
       else if (metadataVersion.isLessThan(IBP_2_1_IV1)) 2.toShort
-      else 3.toShort
+      // Serialize with the highest supported non-flexible version
+      // until a tagged field is introduced or the version is bumped.
+      else OffsetCommitValue.HIGHEST_SUPPORTED_NON_FLEXIBLE_VERSION

Review Comment:
   nit: I would rather prefer to keep using the explicit version here. We could have an issue with `HIGHEST_SUPPORTED_NON_FLEXIBLE_VERSION` and that would mess things up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1167065703


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value().getShort)
+    }
+  }
+

Review Comment:
   what do we want to test for v4?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1167066378


##########
core/src/main/resources/common/message/TransactionLogValue.json:
##########
@@ -13,27 +13,31 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 1 is the first flexible version.
+// ONLY USED FOR TESTING

Review Comment:
   thanks for the catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1167056232


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2616,7 +2675,8 @@ class GroupMetadataManagerTest {
                                                memberId: String,
                                                assignmentBytes: Array[Byte] = Array.emptyByteArray,
                                                metadataVersion: MetadataVersion = MetadataVersion.latest): SimpleRecord = {
-    val memberProtocols = List((protocol, Array.emptyByteArray))
+    val subscription = new Subscription(List("topic").asJava)
+    val memberProtocols = List((protocol, ConsumerProtocol.serializeSubscription(subscription).array()))

Review Comment:
   it's not necessary but i kept seeing
   ```
   Buffer underflow while parsing consumer protocol's header
   ```
   for tests that were calling `buildStableGroupRecordWithMember()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168982888


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +141,119 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
+    val txnPartitions = new TransactionLogValue.PartitionsSchema()
+      .setTopic("topic")
+      .setPartitionIds(java.util.Collections.singletonList(0))
+
+    val txnLogValue = new TransactionLogValue()
+      .setProducerId(100)
+      .setProducerEpoch(50.toShort)
+      .setTransactionStatus(CompleteCommit.id)
+      .setTransactionStartTimestampMs(750L)
+      .setTransactionLastUpdateTimestampMs(1000L)
+      .setTransactionTimeoutMs(500)
+      .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
+
+    val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
+    val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get
+
+    assertEquals(100, deserialized.producerId)
+    assertEquals(50, deserialized.producerEpoch)
+    assertEquals(CompleteCommit, deserialized.state)
+    assertEquals(750L, deserialized.txnStartTimestamp)
+    assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
+    assertEquals(500, deserialized.txnTimeoutMs)
+
+    val actualTxnPartitions = deserialized.topicPartitions
+    assertEquals(1, actualTxnPartitions.size)
+    assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
+  }
+
+  @Test
+  def testDeserializeFutureTransactionLogValue(): Unit = {
+    // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futurePartitionsSchema = new Schema(
+      new Field("topic", Type.COMPACT_STRING, ""),
+      new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+      TaggedFieldsSection.of(
+        0, new Field("partition_foo", Type.STRING, ""),
+        1, new Field("partition_foo", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue.PartitionsSchema with tagged fields

Review Comment:
   nit: `Create`.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +141,119 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
+    val txnPartitions = new TransactionLogValue.PartitionsSchema()
+      .setTopic("topic")
+      .setPartitionIds(java.util.Collections.singletonList(0))
+
+    val txnLogValue = new TransactionLogValue()
+      .setProducerId(100)
+      .setProducerEpoch(50.toShort)
+      .setTransactionStatus(CompleteCommit.id)
+      .setTransactionStartTimestampMs(750L)
+      .setTransactionLastUpdateTimestampMs(1000L)
+      .setTransactionTimeoutMs(500)
+      .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
+
+    val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
+    val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get
+
+    assertEquals(100, deserialized.producerId)
+    assertEquals(50, deserialized.producerEpoch)
+    assertEquals(CompleteCommit, deserialized.state)
+    assertEquals(750L, deserialized.txnStartTimestamp)
+    assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
+    assertEquals(500, deserialized.txnTimeoutMs)
+
+    val actualTxnPartitions = deserialized.topicPartitions
+    assertEquals(1, actualTxnPartitions.size)
+    assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
+  }
+
+  @Test
+  def testDeserializeFutureTransactionLogValue(): Unit = {
+    // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futurePartitionsSchema = new Schema(
+      new Field("topic", Type.COMPACT_STRING, ""),
+      new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+      TaggedFieldsSection.of(
+        0, new Field("partition_foo", Type.STRING, ""),
+        1, new Field("partition_foo", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue.PartitionsSchema with tagged fields
+    val txnPartitions = new Struct(futurePartitionsSchema)
+    txnPartitions.set("topic", "topic")
+    txnPartitions.set("partition_ids", Array(Integer.valueOf(1)))
+    val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnPartitionsTaggedFields.put(0, "foo")
+    txnPartitionsTaggedFields.put(1, 4000)
+    txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields)
+
+    // Copy of TransactionLogValue.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futureTransactionLogValueSchema = new Schema(
+      new Field("producer_id", Type.INT64, ""),
+      new Field("producer_epoch", Type.INT16, ""),
+      new Field("transaction_timeout_ms", Type.INT32, ""),
+      new Field("transaction_status", Type.INT8, ""),
+      new Field("transaction_partitions", CompactArrayOf.nullable(futurePartitionsSchema), ""),
+      new Field("transaction_last_update_timestamp_ms", Type.INT64, ""),
+      new Field("transaction_start_timestamp_ms", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        0, new Field("txn_foo", Type.STRING, ""),
+        1, new Field("txn_bar", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue with tagged fields

Review Comment:
   nit: `Create`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168341032


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +140,88 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeTransactionLogValueWithUnknownTaggedFields(): Unit = {

Review Comment:
   nit: Should we say `FutureTransactionLogValue`?



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +140,88 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeTransactionLogValueWithUnknownTaggedFields(): Unit = {
+    // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futurePartitionsSchema = new Schema(
+      new Field("topic", Type.COMPACT_STRING, ""),
+      new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+      TaggedFieldsSection.of(
+        0, new Field("partition_foo", Type.STRING, ""),
+        1, new Field("partition_foo", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue.PartitionsSchema with tagged fields
+    val txnPartitions = new Struct(futurePartitionsSchema)
+    txnPartitions.set("topic", "topic")
+    txnPartitions.set("partition_ids", Array(Integer.valueOf(1)))
+    val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnPartitionsTaggedFields.put(0, "foo")
+    txnPartitionsTaggedFields.put(1, 4000)
+    txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields)
+
+    // Copy of TransactionLogValue.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futureTransactionLogValueSchema = new Schema(
+      new Field("producer_id", Type.INT64, ""),
+      new Field("producer_epoch", Type.INT16, ""),
+      new Field("transaction_timeout_ms", Type.INT32, ""),
+      new Field("transaction_status", Type.INT8, ""),
+      new Field("transaction_partitions", CompactArrayOf.nullable(futurePartitionsSchema), ""),
+      new Field("transaction_last_update_timestamp_ms", Type.INT64, ""),
+      new Field("transaction_start_timestamp_ms", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        0, new Field("txn_foo", Type.STRING, ""),
+        1, new Field("txn_bar", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue with tagged fields
+    val transactionLogValue = new Struct(futureTransactionLogValueSchema)
+    transactionLogValue.set("producer_id", 1000L)
+    transactionLogValue.set("producer_epoch", 100.toShort)
+    transactionLogValue.set("transaction_timeout_ms", 1000)
+    transactionLogValue.set("transaction_status", CompleteCommit.id)
+    transactionLogValue.set("transaction_partitions", Array(txnPartitions))
+    transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L)
+    transactionLogValue.set("transaction_start_timestamp_ms", 3000L)
+    val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnLogValueTaggedFields.put(0, "foo")
+    txnLogValueTaggedFields.put(1, 4000)
+    transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields)
+
+    // Prepare the buffer.
+    val buffer = ByteBuffer.allocate(transactionLogValue.sizeOf() + 2)
+    buffer.put(0.toByte)
+    buffer.put(1.toByte) // Add 1 as version.
+    transactionLogValue.writeTo(buffer)
+    buffer.flip()
+
+    // Read the buffer with the real schema and verify that tagged
+    // fields were read but ignored.
+    buffer.getShort() // Skip version.
+    val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 1.toShort)
+    assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))

Review Comment:
   nit: Could we also assert the topic's unknown tagged fields?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2616,7 +2675,8 @@ class GroupMetadataManagerTest {
                                                memberId: String,
                                                assignmentBytes: Array[Byte] = Array.emptyByteArray,
                                                metadataVersion: MetadataVersion = MetadataVersion.latest): SimpleRecord = {
-    val memberProtocols = List((protocol, Array.emptyByteArray))
+    val subscription = new Subscription(List("topic").asJava)
+    val memberProtocols = List((protocol, ConsumerProtocol.serializeSubscription(subscription).array()))

Review Comment:
   Ack. I would revert this and fix this in a separate PR with the relevant context. This is confusing otherwise.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value().getShort)
+    }
+  }
+

Review Comment:
   I am perhaps a bit paranoiac here but those tests are based on a copy of the schema. Using the correct one seems to be a good idea.



##########
generator/src/main/java/org/apache/kafka/message/MessageGenerator.java:
##########
@@ -231,9 +231,9 @@ public static void processDirectories(String packageName,
             for (Path inputPath : directoryStream) {
                 try {
                     MessageSpec spec = JSON_SERDE.
-                        readValue(inputPath.toFile(), MessageSpec.class);
+                            readValue(inputPath.toFile(), MessageSpec.class);

Review Comment:
   nit: Let's fully revert changes in this file.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,28 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+

Review Comment:
   I meant a test which ensure that TransactionLogValue serialised with version 1 can be deserialised with the current code.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2469,168 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value.getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value.getShort)
+    }
+  }
+
+  @Test
+  def testDeserializeOffsetCommitValueWithUnknownTaggedFields(): Unit = {
+    // Copy of OffsetCommitValue.SCHEMA_4 with a few
+    // additional tagged fields.
+    val futureOffsetCommitSchema = new Schema(
+      new Field("offset", Type.INT64, ""),
+      new Field("leader_epoch", Type.INT32, ""),
+      new Field("metadata", Type.COMPACT_STRING, ""),
+      new Field("commit_timestamp", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        0, new Field("offset_foo", Type.STRING, ""),
+        1, new Field("offset_bar", Type.INT32, "")
+      )
+    )
+
+    // create OffsetCommitValue with tagged fields
+    val offsetCommit = new Struct(futureOffsetCommitSchema)
+    offsetCommit.set("offset", 1000L)
+    offsetCommit.set("leader_epoch", 100)
+    offsetCommit.set("metadata", "metadata")
+    offsetCommit.set("commit_timestamp", 2000L)
+    val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]()
+    offsetCommitTaggedFields.put(0, "foo")
+    offsetCommitTaggedFields.put(1, 4000)
+    offsetCommit.set("_tagged_fields", offsetCommitTaggedFields)
+
+    // Prepare the buffer.
+    val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2)
+    buffer.put(0.toByte)
+    buffer.put(4.toByte) // Add 4 as version.
+    offsetCommit.writeTo(buffer)
+    buffer.flip()
+
+    // Read the buffer with the real schema and verify that tagged
+    // fields were read but ignored.
+    buffer.getShort() // Skip version.
+    val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 4.toShort)
+    assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
+
+    // Read the buffer with readOffsetMessageValue.
+    buffer.rewind()
+    val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+    assertEquals(1000L, offsetAndMetadata.offset)
+    assertEquals(100, offsetAndMetadata.leaderEpoch.get)
+    assertEquals("metadata", offsetAndMetadata.metadata)
+    assertEquals(2000L, offsetAndMetadata.commitTimestamp)
+  }
+
+  @Test
+  def testDeserializeGroupMetadataValueWithUnknownTaggedFields(): Unit = {

Review Comment:
   nit: ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2469,168 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value.getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value.getShort)
+    }
+  }
+
+  @Test
+  def testDeserializeOffsetCommitValueWithUnknownTaggedFields(): Unit = {

Review Comment:
   nit: Should we say `FutureOffsetCommitValue`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168815457


##########
group-coordinator/src/main/resources/common/message/GroupMetadataValue.json:
##########
@@ -16,8 +16,11 @@
 {
   "type": "data",
   "name": "GroupMetadataValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // Version 4 is the first flexible version.
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-4",

Review Comment:
   From the KIP:
   > Any future tagged fields will not require a version bump and older brokers can simply ignore the tagged fields they do not understand. Note that introducing a new non-tagged field or removing an existing non-tagged field in the future will not be backward compatible.
   
   We decided not to move forward with enforcing that the version is never bumped; instead, we warn the developers that bumping the version will cause backward incompatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1167261871


##########
group-coordinator/src/main/resources/common/message/GroupMetadataValue.json:
##########
@@ -13,11 +13,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 4 is the first flexible version.
 {
   "type": "data",
   "name": "GroupMetadataValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-4",

Review Comment:
   i think the reason was that since we only deserialize with version 3 there were no issues. updated to 0+ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1163746136


##########
core/src/main/resources/common/message/TransactionLogValue.json:
##########
@@ -13,27 +13,29 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 1 is the first flexible version.
+// KIP-915: bumping the version will no longer make this record backward compatible.

Review Comment:
   We could perhaps add more context about the comment as well. For instance, we should suggest to always add tagged fields from now on, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1167208479


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value().getShort)
+    }
+  }
+

Review Comment:
   Should we test that we can deserialize v4 schéma with the current code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1167058951


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,28 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+

Review Comment:
   what's there to test for v1? the highest non flexible version is 0, we won't serialize it to v1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: bump coordinator Value records to flexible versions

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1167262801


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value().getShort)
+    }
+  }
+

Review Comment:
   we already deserialize v4 schema with unknown tagged fields. is this necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168399681


##########
group-coordinator/src/main/resources/common/message/GroupMetadataValue.json:
##########
@@ -16,8 +16,11 @@
 {
   "type": "data",
   "name": "GroupMetadataValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // Version 4 is the first flexible version.
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-4",

Review Comment:
   Maybe we could add a unit test which verifies that the version is never incremented above 4?



##########
group-coordinator/src/main/resources/common/message/OffsetCommitValue.json:
##########
@@ -16,8 +16,11 @@
 {
   "type": "data",
   "name": "OffsetCommitValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // Version 4 is the first flexible version.
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-4",

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13526:
URL: https://github.com/apache/kafka/pull/13526#issuecomment-1513180068

   Merged to trunk and 3.5. We need to open PRs for the other branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org