You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/04/25 08:41:42 UTC

[kafka] branch 3.2 updated: KAFKA-14869: Bump coordinator value records to flexible versions (KIP-915, Part-2) (#13526) (#13602)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 1fc9796357d KAFKA-14869: Bump coordinator value records to flexible versions (KIP-915, Part-2) (#13526) (#13602)
1fc9796357d is described below

commit 1fc9796357db5b281577e1eb08bde939a0daac35
Author: Jeff Kim <ki...@gmail.com>
AuthorDate: Tue Apr 25 04:41:35 2023 -0400

    KAFKA-14869: Bump coordinator value records to flexible versions (KIP-915, Part-2) (#13526) (#13602)
    
    This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../common/message/GroupMetadataValue.json         |   9 +-
 .../common/message/OffsetCommitValue.json          |   7 +-
 .../common/message/TransactionLogValue.json        |  25 +--
 .../coordinator/group/GroupMetadataManager.scala   |   4 +
 .../coordinator/transaction/TransactionLog.scala   |   4 +-
 .../group/GroupMetadataManagerTest.scala           | 216 ++++++++++++++++++++-
 .../transaction/TransactionLogTest.scala           | 121 +++++++++++-
 7 files changed, 366 insertions(+), 20 deletions(-)

diff --git a/core/src/main/resources/common/message/GroupMetadataValue.json b/core/src/main/resources/common/message/GroupMetadataValue.json
index 826a7c8d326..8405fbb9918 100644
--- a/core/src/main/resources/common/message/GroupMetadataValue.json
+++ b/core/src/main/resources/common/message/GroupMetadataValue.json
@@ -16,8 +16,11 @@
 {
   "type": "data",
   "name": "GroupMetadataValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // Version 4 is the first flexible version.
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-4",
+  "flexibleVersions": "4+",
   "fields": [
     { "name": "protocolType", "versions": "0+", "type": "string"},
     { "name": "generation", "versions": "0+", "type": "int32" },
@@ -29,7 +32,7 @@
   "commonStructs": [
     {
       "name": "MemberMetadata",
-      "versions": "0-3",
+      "versions": "0+",
       "fields": [
         { "name": "memberId", "versions": "0+", "type": "string" },
         { "name": "groupInstanceId", "versions": "3+", "type": "string", "default": "null", "nullableVersions": "3+", "ignorable": true},
diff --git a/core/src/main/resources/common/message/OffsetCommitValue.json b/core/src/main/resources/common/message/OffsetCommitValue.json
index db8a6281d43..2973c5ee12a 100644
--- a/core/src/main/resources/common/message/OffsetCommitValue.json
+++ b/core/src/main/resources/common/message/OffsetCommitValue.json
@@ -16,8 +16,11 @@
 {
   "type": "data",
   "name": "OffsetCommitValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // Version 4 is the first flexible version.
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-4",
+  "flexibleVersions": "4+",
   "fields": [
     { "name": "offset", "type": "int64", "versions": "0+" },
     { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true},
diff --git a/core/src/main/resources/common/message/TransactionLogValue.json b/core/src/main/resources/common/message/TransactionLogValue.json
index 7915c3d7cb7..c6efc772d58 100644
--- a/core/src/main/resources/common/message/TransactionLogValue.json
+++ b/core/src/main/resources/common/message/TransactionLogValue.json
@@ -16,24 +16,27 @@
 {
   "type": "data",
   "name": "TransactionLogValue",
-  "validVersions": "0",
-  "flexibleVersions": "none",
+  // Version 1 is the first flexible version.
+  // KIP-915: bumping the version will no longer make this record backward compatible.
+  // We suggest to add/remove only tagged fields to maintain backward compatibility.
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
-    { "name": "ProducerId", "type": "int64", "versions": "0",
+    { "name": "ProducerId", "type": "int64", "versions": "0+",
       "about": "Producer id in use by the transactional id"},
-    { "name": "ProducerEpoch", "type": "int16", "versions": "0",
+    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "Epoch associated with the producer id"},
-    { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0",
+    { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
       "about": "Transaction timeout in milliseconds"},
-    { "name": "TransactionStatus", "type": "int8", "versions": "0",
+    { "name": "TransactionStatus", "type": "int8", "versions": "0+",
       "about": "TransactionState the transaction is in"},
-    { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0", "nullableVersions": "0",
+    { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0+", "nullableVersions": "0+",
       "about": "Set of partitions involved in the transaction", "fields": [
-      { "name": "Topic", "type": "string", "versions": "0"},
-      { "name": "PartitionIds", "type": "[]int32", "versions": "0"}]},
-    { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0",
+      { "name": "Topic", "type": "string", "versions": "0+"},
+      { "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]},
+    { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+",
       "about": "Time the transaction was last updated"},
-    { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0",
+    { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
       "about": "Time the transaction was started"}
   ]
 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 99d0bd74696..b167365ca7e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1083,6 +1083,8 @@ object GroupMetadataManager {
     val version =
       if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
       else if (apiVersion < KAFKA_2_1_IV1) 2.toShort
+      // Serialize with the highest supported non-flexible version
+      // until a tagged field is introduced or the version is bumped.
       else 3.toShort
     MessageUtil.toVersionPrefixedBytes(version, new OffsetCommitValue()
       .setOffset(offsetAndMetadata.offset)
@@ -1111,6 +1113,8 @@ object GroupMetadataManager {
       if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
       else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
       else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      // Serialize with the highest supported non-flexible version
+      // until a tagged field is introduced or the version is bumped.
       else 3.toShort
 
     MessageUtil.toVersionPrefixedBytes(version, new GroupMetadataValue()
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 30bd517c009..f07e7293fc2 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -81,7 +81,9 @@ object TransactionLog {
             .setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava)
         }.toList.asJava
 
-    MessageUtil.toVersionPrefixedBytes(TransactionLogValue.HIGHEST_SUPPORTED_VERSION,
+    // Serialize with the highest supported non-flexible version
+    // until a tagged field is introduced or the version is bumped.
+    MessageUtil.toVersionPrefixedBytes(0,
       new TransactionLogValue()
         .setProducerId(txnMetadata.producerId)
         .setProducerEpoch(txnMetadata.producerEpoch)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index afd8670db8d..79750c6aa42 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -27,6 +27,7 @@ import javax.management.ObjectName
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
+import kafka.internals.generated.{GroupMetadataValue, OffsetCommitValue}
 import kafka.log.{AppendOrigin, LogAppendInfo, UnifiedLog}
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal}
@@ -37,7 +38,9 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics}
-import org.apache.kafka.common.protocol.{Errors, MessageUtil}
+import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection
+import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type}
+import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -2339,6 +2342,217 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value.getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value.getShort)
+    }
+  }
+
+  @Test
+  def testDeserializeHighestSupportedGroupMetadataValueVersion(): Unit = {
+    val member = new GroupMetadataValue.MemberMetadata()
+      .setMemberId("member")
+      .setClientId("client")
+      .setClientHost("host")
+
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val leader = "leader"
+    val groupMetadataValue = new GroupMetadataValue()
+      .setProtocolType(protocolType)
+      .setGeneration(generation)
+      .setProtocol(protocol)
+      .setLeader(leader)
+      .setMembers(java.util.Collections.singletonList(member))
+
+    val deserialized = GroupMetadataManager.readGroupMessageValue("groupId",
+      MessageUtil.toVersionPrefixedByteBuffer(4, groupMetadataValue), time)
+
+    assertEquals(generation, deserialized.generationId)
+    assertEquals(protocolType, deserialized.protocolType.get)
+    assertEquals(protocol, deserialized.protocolName.get)
+    assertEquals(leader, deserialized.leaderOrNull)
+
+    val actualMember = deserialized.allMemberMetadata.head
+    assertEquals(member.memberId, actualMember.memberId)
+    assertEquals(member.clientId, actualMember.clientId)
+    assertEquals(member.clientHost, actualMember.clientHost)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedOffsetCommitValueVersion(): Unit = {
+    val offsetCommitValue = new OffsetCommitValue()
+      .setOffset(1000L)
+      .setMetadata("metadata")
+      .setCommitTimestamp(1500L)
+      .setLeaderEpoch(1)
+
+    val serialized = MessageUtil.toVersionPrefixedByteBuffer(4, offsetCommitValue)
+    val deserialized = GroupMetadataManager.readOffsetMessageValue(serialized)
+
+    assertEquals(1000L, deserialized.offset)
+    assertEquals("metadata", deserialized.metadata)
+    assertEquals(1500L, deserialized.commitTimestamp)
+    assertEquals(1, deserialized.leaderEpoch.get)
+  }
+
+  @Test
+  def testDeserializeFutureOffsetCommitValue(): Unit = {
+    // Copy of OffsetCommitValue.SCHEMA_4 with a few
+    // additional tagged fields.
+    val futureOffsetCommitSchema = new Schema(
+      new Field("offset", Type.INT64, ""),
+      new Field("leader_epoch", Type.INT32, ""),
+      new Field("metadata", Type.COMPACT_STRING, ""),
+      new Field("commit_timestamp", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        Int.box(0), new Field("offset_foo", Type.STRING, ""),
+        Int.box(1), new Field("offset_bar", Type.INT32, "")
+      )
+    )
+
+    // Create OffsetCommitValue with tagged fields
+    val offsetCommit = new Struct(futureOffsetCommitSchema)
+    offsetCommit.set("offset", 1000L)
+    offsetCommit.set("leader_epoch", 100)
+    offsetCommit.set("metadata", "metadata")
+    offsetCommit.set("commit_timestamp", 2000L)
+    val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]()
+    offsetCommitTaggedFields.put(0, "foo")
+    offsetCommitTaggedFields.put(1, 4000)
+    offsetCommit.set("_tagged_fields", offsetCommitTaggedFields)
+
+    // Prepare the buffer.
+    val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2)
+    buffer.put(0.toByte)
+    buffer.put(4.toByte) // Add 4 as version.
+    offsetCommit.writeTo(buffer)
+    buffer.flip()
+
+    // Read the buffer with the real schema and verify that tagged
+    // fields were read but ignored.
+    buffer.getShort() // Skip version.
+    val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 4.toShort)
+    assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
+
+    // Read the buffer with readOffsetMessageValue.
+    buffer.rewind()
+    val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+    assertEquals(1000L, offsetAndMetadata.offset)
+    assertEquals(100, offsetAndMetadata.leaderEpoch.get)
+    assertEquals("metadata", offsetAndMetadata.metadata)
+    assertEquals(2000L, offsetAndMetadata.commitTimestamp)
+  }
+
+  @Test
+  def testDeserializeFutureGroupMetadataValue(): Unit = {
+    // Copy of GroupMetadataValue.MemberMetadata.SCHEMA_4 with a few
+    // additional tagged fields.
+    val futureMemberSchema = new Schema(
+      new Field("member_id", Type.COMPACT_STRING, ""),
+      new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
+      new Field("client_id", Type.COMPACT_STRING, ""),
+      new Field("client_host", Type.COMPACT_STRING, ""),
+      new Field("rebalance_timeout", Type.INT32, ""),
+      new Field("session_timeout", Type.INT32, ""),
+      new Field("subscription", Type.COMPACT_BYTES, ""),
+      new Field("assignment", Type.COMPACT_BYTES, ""),
+      TaggedFieldsSection.of(
+        Int.box(0), new Field("member_foo", Type.STRING, ""),
+        Int.box(1), new Field("member_foo", Type.INT32, "")
+      )
+    )
+
+    // Copy of GroupMetadataValue.SCHEMA_4 with a few
+    // additional tagged fields.
+    val futureGroupSchema = new Schema(
+      new Field("protocol_type", Type.COMPACT_STRING, ""),
+      new Field("generation", Type.INT32, ""),
+      new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
+      new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
+      new Field("current_state_timestamp", Type.INT64, ""),
+      new Field("members", new CompactArrayOf(futureMemberSchema), ""),
+      TaggedFieldsSection.of(
+        Int.box(0), new Field("group_foo", Type.STRING, ""),
+        Int.box(1), new Field("group_bar", Type.INT32, "")
+      )
+    )
+
+    // Create a member with tagged fields.
+    val member = new Struct(futureMemberSchema)
+    member.set("member_id", "member_id")
+    member.set("group_instance_id", "group_instance_id")
+    member.set("client_id", "client_id")
+    member.set("client_host", "client_host")
+    member.set("rebalance_timeout", 1)
+    member.set("session_timeout", 2)
+    member.set("subscription", ByteBuffer.allocate(0))
+    member.set("assignment", ByteBuffer.allocate(0))
+
+    val memberTaggedFields = new java.util.TreeMap[Integer, Any]()
+    memberTaggedFields.put(0, "foo")
+    memberTaggedFields.put(1, 4000)
+    member.set("_tagged_fields", memberTaggedFields)
+
+    // Create a group with tagged fields.
+    val group = new Struct(futureGroupSchema)
+    group.set("protocol_type", "consumer")
+    group.set("generation", 10)
+    group.set("protocol", "range")
+    group.set("leader", "leader")
+    group.set("current_state_timestamp", 1000L)
+    group.set("members", Array(member))
+
+    val groupTaggedFields = new java.util.TreeMap[Integer, Any]()
+    groupTaggedFields.put(0, "foo")
+    groupTaggedFields.put(1, 4000)
+    group.set("_tagged_fields", groupTaggedFields)
+
+    // Prepare the buffer.
+    val buffer = ByteBuffer.allocate(group.sizeOf() + 2)
+    buffer.put(0.toByte)
+    buffer.put(4.toByte) // Add 4 as version.
+    group.writeTo(buffer)
+    buffer.flip()
+
+    // Read the buffer with the real schema and verify that tagged
+    // fields were read but ignored.
+    buffer.getShort() // Skip version.
+    val value = new GroupMetadataValue(new ByteBufferAccessor(buffer), 4.toShort)
+    assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
+    assertEquals(Seq(0, 1), value.members().get(0).unknownTaggedFields().asScala.map(_.tag))
+
+    // Read the buffer with readGroupMessageValue.
+    buffer.rewind()
+    val groupMetadata = GroupMetadataManager.readGroupMessageValue("group", buffer, time)
+    assertEquals("consumer", groupMetadata.protocolType.get)
+    assertEquals("leader", groupMetadata.leaderOrNull)
+    assertTrue(groupMetadata.allMembers.contains("member_id"))
+  }
+
   @Test
   def testLoadOffsetsWithEmptyControlBatch(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
index eb1284278d2..332dd669b4e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
@@ -20,9 +20,12 @@ package kafka.coordinator.transaction
 import kafka.internals.generated.TransactionLogKey
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.MessageUtil
+import kafka.internals.generated.TransactionLogValue
+import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
+import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection
+import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
 
 import java.nio.ByteBuffer
@@ -139,6 +142,120 @@ class TransactionLogTest {
   }
 
   @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
+    val txnPartitions = new TransactionLogValue.PartitionsSchema()
+      .setTopic("topic")
+      .setPartitionIds(java.util.Collections.singletonList(0))
+
+    val txnLogValue = new TransactionLogValue()
+      .setProducerId(100)
+      .setProducerEpoch(50.toShort)
+      .setTransactionStatus(CompleteCommit.id)
+      .setTransactionStartTimestampMs(750L)
+      .setTransactionLastUpdateTimestampMs(1000L)
+      .setTransactionTimeoutMs(500)
+      .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
+
+    val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
+    val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get
+
+    assertEquals(100, deserialized.producerId)
+    assertEquals(50, deserialized.producerEpoch)
+    assertEquals(CompleteCommit, deserialized.state)
+    assertEquals(750L, deserialized.txnStartTimestamp)
+    assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
+    assertEquals(500, deserialized.txnTimeoutMs)
+
+    val actualTxnPartitions = deserialized.topicPartitions
+    assertEquals(1, actualTxnPartitions.size)
+    assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
+  }
+
+  @Test
+  def testDeserializeFutureTransactionLogValue(): Unit = {
+    // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futurePartitionsSchema = new Schema(
+      new Field("topic", Type.COMPACT_STRING, ""),
+      new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+      TaggedFieldsSection.of(
+        Int.box(0), new Field("partition_foo", Type.STRING, ""),
+        Int.box(1), new Field("partition_foo", Type.INT32, "")
+      )
+    )
+
+    // Create TransactionLogValue.PartitionsSchema with tagged fields
+    val txnPartitions = new Struct(futurePartitionsSchema)
+    txnPartitions.set("topic", "topic")
+    txnPartitions.set("partition_ids", Array(Integer.valueOf(1)))
+    val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnPartitionsTaggedFields.put(0, "foo")
+    txnPartitionsTaggedFields.put(1, 4000)
+    txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields)
+
+    // Copy of TransactionLogValue.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futureTransactionLogValueSchema = new Schema(
+      new Field("producer_id", Type.INT64, ""),
+      new Field("producer_epoch", Type.INT16, ""),
+      new Field("transaction_timeout_ms", Type.INT32, ""),
+      new Field("transaction_status", Type.INT8, ""),
+      new Field("transaction_partitions", CompactArrayOf.nullable(futurePartitionsSchema), ""),
+      new Field("transaction_last_update_timestamp_ms", Type.INT64, ""),
+      new Field("transaction_start_timestamp_ms", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        Int.box(0), new Field("txn_foo", Type.STRING, ""),
+        Int.box(1), new Field("txn_bar", Type.INT32, "")
+      )
+    )
+
+    // Create TransactionLogValue with tagged fields
+    val transactionLogValue = new Struct(futureTransactionLogValueSchema)
+    transactionLogValue.set("producer_id", 1000L)
+    transactionLogValue.set("producer_epoch", 100.toShort)
+    transactionLogValue.set("transaction_timeout_ms", 1000)
+    transactionLogValue.set("transaction_status", CompleteCommit.id)
+    transactionLogValue.set("transaction_partitions", Array(txnPartitions))
+    transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L)
+    transactionLogValue.set("transaction_start_timestamp_ms", 3000L)
+    val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnLogValueTaggedFields.put(0, "foo")
+    txnLogValueTaggedFields.put(1, 4000)
+    transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields)
+
+    // Prepare the buffer.
+    val buffer = ByteBuffer.allocate(transactionLogValue.sizeOf() + 2)
+    buffer.put(0.toByte)
+    buffer.put(1.toByte) // Add 1 as version.
+    transactionLogValue.writeTo(buffer)
+    buffer.flip()
+
+    // Read the buffer with the real schema and verify that tagged
+    // fields were read but ignored.
+    buffer.getShort() // Skip version.
+    val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 1.toShort)
+    assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
+    assertEquals(Seq(0, 1), value.transactionPartitions().get(0).unknownTaggedFields().asScala.map(_.tag))
+
+    // Read the buffer with readTxnRecordValue.
+    buffer.rewind()
+    val txnMetadata = TransactionLog.readTxnRecordValue("transaction-id", buffer).get
+    assertEquals(1000L, txnMetadata.producerId)
+    assertEquals(100, txnMetadata.producerEpoch)
+    assertEquals(1000L, txnMetadata.txnTimeoutMs)
+    assertEquals(CompleteCommit, txnMetadata.state)
+    assertEquals(Set(new TopicPartition("topic", 1)), txnMetadata.topicPartitions)
+    assertEquals(2000L, txnMetadata.txnLastUpdateTimestamp)
+    assertEquals(3000L, txnMetadata.txnStartTimestamp)
+  }
+
   def testReadTxnRecordKeyCanReadUnknownMessage(): Unit = {
     val record = new TransactionLogKey()
     val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record)