You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/12/19 19:31:45 UTC

[kafka] branch trunk updated: KAFKA-14472: Move TransactionIndex and related to storage module (#12996)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e2678d57d09 KAFKA-14472: Move TransactionIndex and related to storage module (#12996)
e2678d57d09 is described below

commit e2678d57d0919aaa97effe2ee7591cd6b85b5303
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon Dec 19 11:31:37 2022 -0800

    KAFKA-14472: Move TransactionIndex and related to storage module (#12996)
    
    For broader context on this change, please check:
    
    * KAFKA-14470: Move log layer to storage module
    
    Reviewers: Jun Rao <ju...@gmail.com>, Satish Duggana <sa...@apache.org>
---
 core/src/main/scala/kafka/log/LocalLog.scala       |  10 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     |   3 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |   2 +-
 .../scala/kafka/log/ProducerStateManager.scala     |   3 +-
 .../main/scala/kafka/log/TransactionIndex.scala    | 264 ---------------------
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  21 +-
 .../scala/kafka/log/remote/RemoteIndexCache.scala  |   2 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   3 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   5 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |   1 +
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |   4 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |   6 +-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |   9 +-
 .../unit/kafka/log/TransactionIndexTest.scala      |  80 ++++---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |   1 +
 .../kafka/server/log/internals/AbortedTxn.java     | 117 +++++++++
 .../kafka/server/log/internals/CompletedTxn.java   |  75 ++++++
 .../server/log/internals/TransactionIndex.java     | 264 +++++++++++++++++++++
 .../server/log/internals/TxnIndexSearchResult.java |  30 +++
 19 files changed, 556 insertions(+), 344 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index 56172333351..68bb9d9f8b0 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.server.log.internals.OffsetPosition
+import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.{Seq, immutable}
@@ -448,7 +448,7 @@ class LocalLog(@volatile private var _dir: File,
     }
 
     val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
-    def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
+    def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
     collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)
 
     FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
@@ -459,13 +459,13 @@ class LocalLog(@volatile private var _dir: File,
 
   private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
                                          startingSegment: LogSegment,
-                                         accumulator: List[AbortedTxn] => Unit): Unit = {
+                                         accumulator: Seq[AbortedTxn] => Unit): Unit = {
     val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
     var segmentEntryOpt = Option(startingSegment)
     while (segmentEntryOpt.isDefined) {
       val segment = segmentEntryOpt.get
       val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
-      accumulator(searchResult.abortedTransactions)
+      accumulator(searchResult.abortedTransactions.asScala)
       if (searchResult.isComplete)
         return
       segmentEntryOpt = nextOption(higherSegments)
@@ -475,7 +475,7 @@ class LocalLog(@volatile private var _dir: File,
   private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
     val segmentEntry = segments.floorSegment(baseOffset)
     val allAbortedTxns = ListBuffer.empty[AbortedTxn]
-    def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
+    def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
     segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
     allAbortedTxns.toList
   }
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4ad0ea2d853..8bafc0aae60 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
@@ -1123,7 +1124,7 @@ private[log] class CleanedTransactionMetadata {
   private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
   // Minheap of aborted transactions sorted by the transaction first offset
   private val abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
-    override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
+    override def compare(x: AbortedTxn, y: AbortedTxn): Int = java.lang.Long.compare(x.firstOffset, y.firstOffset)
   }.reverse)
 
   // Output cleaned index to write retained aborted transactions
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4da227556f5..0c2a013e11f 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
-import org.apache.kafka.server.log.internals.{OffsetPosition, TimestampOffset}
+import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
 
 import scala.jdk.CollectionConverters._
 import scala.math._
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 835b74066b6..7307bed0efa 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils}
+import org.apache.kafka.server.log.internals.CompletedTxn
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
@@ -318,7 +319,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
     // without any associated data will not have any impact on the last stable offset
     // and would not need to be reflected in the transaction index.
     val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
-      CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
+      new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
     }
 
     updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
deleted file mode 100644
index 72321db31ee..00000000000
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log
-
-import java.io.{Closeable, File, IOException}
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-import java.nio.file.{Files, StandardOpenOption}
-import kafka.utils.{Logging, nonthreadsafe}
-import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.log.internals.CorruptIndexException
-
-import scala.collection.mutable.ListBuffer
-
-private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTxn], isComplete: Boolean)
-
-/**
- * The transaction index maintains metadata about the aborted transactions for each segment. This includes
- * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
- * the abort. This index is used to find the aborted transactions in the range of a given fetch request at
- * the READ_COMMITTED isolation level.
- *
- * There is at most one transaction index for each log segment. The entries correspond to the transactions
- * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
- * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
- * order to find the start of the transactions.
- */
-@nonthreadsafe
-class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Closeable with Logging {
-
-  // note that the file is not created until we need it
-  @volatile private var maybeChannel: Option[FileChannel] = None
-  private var lastOffset: Option[Long] = None
-
-  if (_file.exists)
-    openChannel()
-
-  def append(abortedTxn: AbortedTxn): Unit = {
-    lastOffset.foreach { offset =>
-      if (offset >= abortedTxn.lastOffset)
-        throw new IllegalArgumentException(s"The last offset of appended transactions must increase sequentially, but " +
-          s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
-    }
-    lastOffset = Some(abortedTxn.lastOffset)
-    Utils.writeFully(channel(), abortedTxn.buffer.duplicate())
-  }
-
-  def flush(): Unit = maybeChannel.foreach(_.force(true))
-
-  def file: File = _file
-
-  def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
-
-  /**
-   * Delete this index.
-   *
-   * @throws IOException if deletion fails due to an I/O error
-   * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
-   *         not exist
-   */
-  def deleteIfExists(): Boolean = {
-    close()
-    Files.deleteIfExists(file.toPath)
-  }
-
-  private def channel(): FileChannel = {
-    maybeChannel match {
-      case Some(channel) => channel
-      case None => openChannel()
-    }
-  }
-
-  private def openChannel(): FileChannel = {
-    val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
-      StandardOpenOption.WRITE)
-    maybeChannel = Some(channel)
-    channel.position(channel.size)
-    channel
-  }
-
-  /**
-   * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
-   */
-  def reset(): Unit = {
-    maybeChannel.foreach(_.truncate(0))
-    lastOffset = None
-  }
-
-  def close(): Unit = {
-    maybeChannel.foreach(_.close())
-    maybeChannel = None
-  }
-
-  def renameTo(f: File): Unit = {
-    try {
-      if (file.exists)
-        Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
-    } finally _file = f
-  }
-
-  def truncateTo(offset: Long): Unit = {
-    val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
-    var newLastOffset: Option[Long] = None
-    for ((abortedTxn, position) <- iterator(() => buffer)) {
-      if (abortedTxn.lastOffset >= offset) {
-        channel().truncate(position)
-        lastOffset = newLastOffset
-        return
-      }
-      newLastOffset = Some(abortedTxn.lastOffset)
-    }
-  }
-
-  private def iterator(allocate: () => ByteBuffer = () => ByteBuffer.allocate(AbortedTxn.TotalSize)): Iterator[(AbortedTxn, Int)] = {
-    maybeChannel match {
-      case None => Iterator.empty
-      case Some(channel) =>
-        var position = 0
-
-        new Iterator[(AbortedTxn, Int)] {
-          override def hasNext: Boolean = channel.position - position >= AbortedTxn.TotalSize
-
-          override def next(): (AbortedTxn, Int) = {
-            try {
-              val buffer = allocate()
-              Utils.readFully(channel, buffer, position)
-              buffer.flip()
-
-              val abortedTxn = new AbortedTxn(buffer)
-              if (abortedTxn.version > AbortedTxn.CurrentVersion)
-                throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version} " +
-                  s"in transaction index ${file.getAbsolutePath}, current version is ${AbortedTxn.CurrentVersion}")
-              val nextEntry = (abortedTxn, position)
-              position += AbortedTxn.TotalSize
-              nextEntry
-            } catch {
-              case e: IOException =>
-                // We received an unexpected error reading from the index file. We propagate this as an
-                // UNKNOWN error to the consumer, which will cause it to retry the fetch.
-                throw new KafkaException(s"Failed to read from the transaction index ${file.getAbsolutePath}", e)
-            }
-          }
-        }
-    }
-  }
-
-  def allAbortedTxns: List[AbortedTxn] = {
-    iterator().map(_._1).toList
-  }
-
-  /**
-   * Collect all aborted transactions which overlap with a given fetch range.
-   *
-   * @param fetchOffset Inclusive first offset of the fetch range
-   * @param upperBoundOffset Exclusive last offset in the fetch range
-   * @return An object containing the aborted transactions and whether the search needs to continue
-   *         into the next log segment.
-   */
-  def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
-    val abortedTransactions = ListBuffer.empty[AbortedTxn]
-    for ((abortedTxn, _) <- iterator()) {
-      if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
-        abortedTransactions += abortedTxn
-
-      if (abortedTxn.lastStableOffset >= upperBoundOffset)
-        return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
-    }
-    TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
-  }
-
-  /**
-   * Do a basic sanity check on this index to detect obvious problems.
-   *
-   * @throws CorruptIndexException if any problems are found.
-   */
-  def sanityCheck(): Unit = {
-    val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
-    for ((abortedTxn, _) <- iterator(() => buffer)) {
-      if (abortedTxn.lastOffset < startOffset)
-        throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn in index " +
-          s"${file.getAbsolutePath} is less than start offset $startOffset")
-    }
-  }
-
-}
-
-private[log] object AbortedTxn {
-  val VersionOffset = 0
-  val VersionSize = 2
-  val ProducerIdOffset = VersionOffset + VersionSize
-  val ProducerIdSize = 8
-  val FirstOffsetOffset = ProducerIdOffset + ProducerIdSize
-  val FirstOffsetSize = 8
-  val LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize
-  val LastOffsetSize = 8
-  val LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize
-  val LastStableOffsetSize = 8
-  val TotalSize = LastStableOffsetOffset + LastStableOffsetSize
-
-  val CurrentVersion: Short = 0
-}
-
-private[log] class AbortedTxn(val buffer: ByteBuffer) {
-  import AbortedTxn._
-
-  def this(producerId: Long,
-           firstOffset: Long,
-           lastOffset: Long,
-           lastStableOffset: Long) = {
-    this(ByteBuffer.allocate(AbortedTxn.TotalSize))
-    buffer.putShort(CurrentVersion)
-    buffer.putLong(producerId)
-    buffer.putLong(firstOffset)
-    buffer.putLong(lastOffset)
-    buffer.putLong(lastStableOffset)
-    buffer.flip()
-  }
-
-  def this(completedTxn: CompletedTxn, lastStableOffset: Long) =
-    this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset)
-
-  def version: Short = buffer.get(VersionOffset)
-
-  def producerId: Long = buffer.getLong(ProducerIdOffset)
-
-  def firstOffset: Long = buffer.getLong(FirstOffsetOffset)
-
-  def lastOffset: Long = buffer.getLong(LastOffsetOffset)
-
-  def lastStableOffset: Long = buffer.getLong(LastStableOffsetOffset)
-
-  def asAbortedTransaction: FetchResponseData.AbortedTransaction = new FetchResponseData.AbortedTransaction()
-    .setProducerId(producerId)
-    .setFirstOffset(firstOffset)
-
-  override def toString: String =
-    s"AbortedTxn(version=$version, producerId=$producerId, firstOffset=$firstOffset, " +
-      s"lastOffset=$lastOffset, lastStableOffset=$lastStableOffset)"
-
-  override def equals(any: Any): Boolean = {
-    any match {
-      case that: AbortedTxn => this.buffer.equals(that.buffer)
-      case _ => false
-    }
-  }
-
-  override def hashCode(): Int = buffer.hashCode
-}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 8830258c8fe..e1f49cceb6e 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
+import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn}
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 
 import scala.annotation.nowarn
@@ -159,26 +160,6 @@ case class LogReadInfo(fetchedData: FetchDataInfo,
                        logEndOffset: Long,
                        lastStableOffset: Long)
 
-/**
- * A class used to hold useful metadata about a completed transaction. This is used to build
- * the transaction index after appending to the log.
- *
- * @param producerId The ID of the producer
- * @param firstOffset The first offset (inclusive) of the transaction
- * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
- *                   COMMIT/ABORT control record which indicates the transaction's completion.
- * @param isAborted Whether or not the transaction was aborted
- */
-case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) {
-  override def toString: String = {
-    "CompletedTxn(" +
-      s"producerId=$producerId, " +
-      s"firstOffset=$firstOffset, " +
-      s"lastOffset=$lastOffset, " +
-      s"isAborted=$isAborted)"
-  }
-}
-
 /**
  * A class used to hold params required to decide to rotate a log segment or not.
  */
diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
index 9dfaaa5539c..594b74f19ff 100644
--- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
+++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
@@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.log.internals.OffsetPosition
+import org.apache.kafka.server.log.internals.{OffsetPosition, TransactionIndex}
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
 
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index c82523eff4b..fe8bfd98d41 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
+import org.apache.kafka.server.log.internals.TransactionIndex
 import org.apache.kafka.snapshot.Snapshots
 
 import scala.jdk.CollectionConverters._
@@ -94,7 +95,7 @@ object DumpLogSegments {
 
   private def dumpTxnIndex(file: File): Unit = {
     val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)
-    for (abortedTxn <- index.allAbortedTxns) {
+    for (abortedTxn <- index.allAbortedTxns.asScala) {
       println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
         s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 819278bd280..071e8b8fd1c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.log.internals.AbortedTxn
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 
@@ -328,8 +329,8 @@ class LogCleanerTest {
     assertEquals(20L, log.logEndOffset)
 
     val expectedAbortedTxns = List(
-      new AbortedTxn(producerId=producerId1, firstOffset=8, lastOffset=10, lastStableOffset=11),
-      new AbortedTxn(producerId=producerId2, firstOffset=11, lastOffset=16, lastStableOffset=17)
+      new AbortedTxn(producerId1, 8, 10, 11),
+      new AbortedTxn(producerId2, 11, 16, 17)
     )
 
     assertAllTransactionsComplete(log)
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index ff7cc00ffe9..269dff643b5 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
+import org.apache.kafka.server.log.internals.AbortedTxn
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
 import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index c31099fa928..c1776a4344c 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -346,7 +346,7 @@ class LogSegmentTest {
 
     var abortedTxns = segment.txnIndex.allAbortedTxns
     assertEquals(1, abortedTxns.size)
-    var abortedTxn = abortedTxns.head
+    var abortedTxn = abortedTxns.get(0)
     assertEquals(pid2, abortedTxn.producerId)
     assertEquals(102L, abortedTxn.firstOffset)
     assertEquals(106L, abortedTxn.lastOffset)
@@ -362,7 +362,7 @@ class LogSegmentTest {
 
     abortedTxns = segment.txnIndex.allAbortedTxns
     assertEquals(1, abortedTxns.size)
-    abortedTxn = abortedTxns.head
+    abortedTxn = abortedTxns.get(0)
     assertEquals(pid2, abortedTxn.producerId)
     assertEquals(75L, abortedTxn.firstOffset)
     assertEquals(106L, abortedTxn.lastOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 8a73c8bb943..a293ed9eac9 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -21,7 +21,6 @@ import kafka.log.remote.RemoteLogManager
 
 import java.io.File
 import java.util.Properties
-
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel}
 import kafka.utils.{Scheduler, TestUtils}
@@ -29,10 +28,11 @@ import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
+
 import java.nio.file.Files
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
-
 import kafka.log
+import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}
 
 import scala.collection.Iterable
 import scala.jdk.CollectionConverters._
@@ -237,7 +237,7 @@ object LogTestUtils {
     log.read(startOffset, maxLength, isolation, minOneMessage)
   }
 
-  def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
+  def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns.asScala)
 
   def deleteProducerSnapshotFiles(logDir: File): Unit = {
     val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix))
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index b631b642ef4..c808d03a72e 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.apache.kafka.server.log.internals.CompletedTxn
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.Mockito.{mock, when}
@@ -246,7 +247,7 @@ class ProducerStateManagerTest {
     // incomplete transaction
     val secondAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
     val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, secondAppend)
-    assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)), firstCompletedTxn)
+    assertEquals(Some(new CompletedTxn(producerId, 16L, 21, false)), firstCompletedTxn)
     assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, secondAppend))
     assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend))
     appendData(24L, 27L, secondAppend)
@@ -392,21 +393,21 @@ class ProducerStateManagerTest {
     beginTxn(producerId3, startOffset3)
 
     val lastOffset1 = startOffset3 + 15
-    val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false)
+    val completedTxn1 = new CompletedTxn(producerId1, startOffset1, lastOffset1, false)
     assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1))
     stateManager.completeTxn(completedTxn1)
     stateManager.onHighWatermarkUpdated(lastOffset1 + 1)
     assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
 
     val lastOffset3 = lastOffset1 + 20
-    val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false)
+    val completedTxn3 = new CompletedTxn(producerId3, startOffset3, lastOffset3, false)
     assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3))
     stateManager.completeTxn(completedTxn3)
     stateManager.onHighWatermarkUpdated(lastOffset3 + 1)
     assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
 
     val lastOffset2 = lastOffset3 + 78
-    val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false)
+    val completedTxn2 = new CompletedTxn(producerId2, startOffset2, lastOffset2, false)
     assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2))
     stateManager.completeTxn(completedTxn2)
     stateManager.onHighWatermarkUpdated(lastOffset2 + 1)
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 9c1cbe3f307..6784e76a8e5 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -18,11 +18,13 @@ package kafka.log
 
 import kafka.utils.TestUtils
 import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.server.log.internals.CorruptIndexException
+import org.apache.kafka.server.log.internals.{AbortedTxn, CorruptIndexException, TransactionIndex}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
+import scala.jdk.CollectionConverters._
 import java.io.File
+import java.util.Collections
 
 class TransactionIndexTest {
   var file: File = _
@@ -43,26 +45,26 @@ class TransactionIndexTest {
   @Test
   def testPositionSetCorrectlyWhenOpened(): Unit = {
     val abortedTxns = List(
-      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
-      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
-      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
-      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+      new AbortedTxn(0L, 0, 10, 11),
+      new AbortedTxn(1L, 5, 15, 13),
+      new AbortedTxn(2L, 18, 35, 25),
+      new AbortedTxn(3L, 32, 50, 40))
     abortedTxns.foreach(index.append)
     index.close()
 
     val reopenedIndex = new TransactionIndex(0L, file)
-    val anotherAbortedTxn = new AbortedTxn(producerId = 3L, firstOffset = 50, lastOffset = 60, lastStableOffset = 55)
+    val anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55)
     reopenedIndex.append(anotherAbortedTxn)
-    assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns)
+    assertEquals((abortedTxns ++ List(anotherAbortedTxn)).asJava, reopenedIndex.allAbortedTxns)
   }
 
   @Test
   def testSanityCheck(): Unit = {
     val abortedTxns = List(
-      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
-      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
-      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
-      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+      new AbortedTxn(0L, 0, 10, 11),
+      new AbortedTxn(1L, 5, 15, 13),
+      new AbortedTxn(2L, 18, 35, 25),
+      new AbortedTxn(3L, 32, 50, 40))
     abortedTxns.foreach(index.append)
     index.close()
 
@@ -73,71 +75,71 @@ class TransactionIndexTest {
 
   @Test
   def testLastOffsetMustIncrease(): Unit = {
-    index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
-    assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0,
-      lastOffset = 15, lastStableOffset = 11)))
+    index.append(new AbortedTxn(1L, 5, 15, 13))
+    assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
+      15, 11)))
   }
 
   @Test
   def testLastOffsetCannotDecrease(): Unit = {
-    index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
-    assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0,
-      lastOffset = 10, lastStableOffset = 11)))
+    index.append(new AbortedTxn(1L, 5, 15, 13))
+    assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
+      10, 11)))
   }
 
   @Test
   def testCollectAbortedTransactions(): Unit = {
     val abortedTransactions = List(
-      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
-      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
-      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
-      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+      new AbortedTxn(0L, 0, 10, 11),
+      new AbortedTxn(1L, 5, 15, 13),
+      new AbortedTxn(2L, 18, 35, 25),
+      new AbortedTxn(3L, 32, 50, 40))
 
     abortedTransactions.foreach(index.append)
 
     var result = index.collectAbortedTxns(0L, 100L)
-    assertEquals(abortedTransactions, result.abortedTransactions)
+    assertEquals(abortedTransactions.asJava, result.abortedTransactions)
     assertFalse(result.isComplete)
 
     result = index.collectAbortedTxns(0L, 32)
-    assertEquals(abortedTransactions.take(3), result.abortedTransactions)
+    assertEquals(abortedTransactions.take(3).asJava, result.abortedTransactions)
     assertTrue(result.isComplete)
 
     result = index.collectAbortedTxns(0L, 35)
-    assertEquals(abortedTransactions, result.abortedTransactions)
+    assertEquals(abortedTransactions.asJava, result.abortedTransactions)
     assertTrue(result.isComplete)
 
     result = index.collectAbortedTxns(10, 35)
-    assertEquals(abortedTransactions, result.abortedTransactions)
+    assertEquals(abortedTransactions.asJava, result.abortedTransactions)
     assertTrue(result.isComplete)
 
     result = index.collectAbortedTxns(11, 35)
-    assertEquals(abortedTransactions.slice(1, 4), result.abortedTransactions)
+    assertEquals(abortedTransactions.slice(1, 4).asJava, result.abortedTransactions)
     assertTrue(result.isComplete)
 
     result = index.collectAbortedTxns(20, 41)
-    assertEquals(abortedTransactions.slice(2, 4), result.abortedTransactions)
+    assertEquals(abortedTransactions.slice(2, 4).asJava, result.abortedTransactions)
     assertFalse(result.isComplete)
   }
 
   @Test
   def testTruncate(): Unit = {
     val abortedTransactions = List(
-      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2),
-      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16),
-      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
-      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+      new AbortedTxn(0L, 0, 10, 2),
+      new AbortedTxn(1L, 5, 15, 16),
+      new AbortedTxn(2L, 18, 35, 25),
+      new AbortedTxn(3L, 32, 50, 40))
 
     abortedTransactions.foreach(index.append)
 
     index.truncateTo(51)
-    assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions)
+    assertEquals(abortedTransactions.asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
 
     index.truncateTo(50)
-    assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions)
+    assertEquals(abortedTransactions.take(3).asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
 
     index.reset()
-    assertEquals(List.empty[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
+    assertEquals(Collections.emptyList[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
   }
 
   @Test
@@ -148,7 +150,7 @@ class TransactionIndexTest {
     val lastStableOffset = 200L
 
     val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset)
-    assertEquals(AbortedTxn.CurrentVersion, abortedTxn.version)
+    assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version)
     assertEquals(pid, abortedTxn.producerId)
     assertEquals(firstOffset, abortedTxn.firstOffset)
     assertEquals(lastOffset, abortedTxn.lastOffset)
@@ -158,15 +160,15 @@ class TransactionIndexTest {
   @Test
   def testRenameIndex(): Unit = {
     val renamed = TestUtils.tempFile()
-    index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2))
+    index.append(new AbortedTxn(0L, 0, 10, 2))
 
     index.renameTo(renamed)
-    index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16))
+    index.append(new AbortedTxn(1L, 5, 15, 16))
 
     val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions
     assertEquals(2, abortedTxns.size)
-    assertEquals(0, abortedTxns(0).firstOffset)
-    assertEquals(5, abortedTxns(1).firstOffset)
+    assertEquals(0, abortedTxns.get(0).firstOffset)
+    assertEquals(5, abortedTxns.get(1).firstOffset)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 42fdafae206..45335eec8ec 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
 import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
+import org.apache.kafka.server.log.internals.AbortedTxn
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java b/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java
new file mode 100644
index 00000000000..2ad08ce77ba
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class AbortedTxn {
+    static final int VERSION_OFFSET = 0;
+    static final int VERSION_SIZE = 2;
+    static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE;
+    static final int PRODUCER_ID_SIZE = 8;
+    static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_SIZE;
+    static final int FIRST_OFFSET_SIZE = 8;
+    static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET + FIRST_OFFSET_SIZE;
+    static final int LAST_OFFSET_SIZE = 8;
+    static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET + LAST_OFFSET_SIZE;
+    static final int LAST_STABLE_OFFSET_SIZE = 8;
+    static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET + LAST_STABLE_OFFSET_SIZE;
+
+    public static final short CURRENT_VERSION = 0;
+
+    final ByteBuffer buffer;
+
+    AbortedTxn(ByteBuffer buffer) {
+        Objects.requireNonNull(buffer);
+        this.buffer = buffer;
+    }
+
+    public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) {
+        this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset);
+    }
+
+    public AbortedTxn(long producerId, long firstOffset, long lastOffset, long lastStableOffset) {
+        this(toByteBuffer(producerId, firstOffset, lastOffset, lastStableOffset));
+    }
+
+    private static ByteBuffer toByteBuffer(long producerId, long firstOffset, long lastOffset, long lastStableOffset) {
+        ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE);
+        buffer.putShort(CURRENT_VERSION);
+        buffer.putLong(producerId);
+        buffer.putLong(firstOffset);
+        buffer.putLong(lastOffset);
+        buffer.putLong(lastStableOffset);
+        buffer.flip();
+        return buffer;
+    }
+
+    public short version() {
+        return buffer.get(VERSION_OFFSET);
+    }
+
+    public long producerId() {
+        return buffer.getLong(PRODUCER_ID_OFFSET);
+    }
+
+    public long firstOffset() {
+        return buffer.getLong(FIRST_OFFSET_OFFSET);
+    }
+
+    public long lastOffset() {
+        return buffer.getLong(LAST_OFFSET_OFFSET);
+    }
+
+    public long lastStableOffset() {
+        return buffer.getLong(LAST_STABLE_OFFSET_OFFSET);
+    }
+
+    public FetchResponseData.AbortedTransaction asAbortedTransaction() {
+        return new FetchResponseData.AbortedTransaction()
+            .setProducerId(producerId())
+            .setFirstOffset(firstOffset());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        AbortedTxn that = (AbortedTxn) o;
+        return buffer.equals(that.buffer);
+    }
+
+    @Override
+    public int hashCode() {
+        return buffer.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "AbortedTxn(version=" + version()
+            + ", producerId=" + producerId()
+            + ", firstOffset=" + firstOffset()
+            + ", lastOffset=" + lastOffset()
+            + ", lastStableOffset=" + lastStableOffset()
+            + ")";
+    }
+
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java b/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java
new file mode 100644
index 00000000000..1ad6f8854b4
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+/**
+ * A class used to hold useful metadata about a completed transaction. This is used to build
+ * the transaction index after appending to the log.
+ */
+public class CompletedTxn {
+    public final long producerId;
+    public final long firstOffset;
+    public final long lastOffset;
+    public final boolean isAborted;
+
+    /**
+     * Create an instance of this class.
+     *
+     * @param producerId  The ID of the producer
+     * @param firstOffset The first offset (inclusive) of the transaction
+     * @param lastOffset  The last offset (inclusive) of the transaction. This is always the offset of the
+     *                    COMMIT/ABORT control record which indicates the transaction's completion.
+     * @param isAborted   Whether the transaction was aborted
+     */
+    public CompletedTxn(long producerId, long firstOffset, long lastOffset, boolean isAborted) {
+        this.producerId = producerId;
+        this.firstOffset = firstOffset;
+        this.lastOffset = lastOffset;
+        this.isAborted = isAborted;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        CompletedTxn that = (CompletedTxn) o;
+
+        return producerId == that.producerId
+            && firstOffset == that.firstOffset
+            && lastOffset == that.lastOffset
+            && isAborted == that.isAborted;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Long.hashCode(producerId);
+        result = 31 * result + Long.hashCode(firstOffset);
+        result = 31 * result + Long.hashCode(lastOffset);
+        result = 31 * result + Boolean.hashCode(isAborted);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "CompletedTxn(producerId=" + producerId +
+            ", firstOffset=" + firstOffset +
+            ", lastOffset=" + lastOffset +
+            ", isAborted=" + isAborted +
+            ')';
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java
new file mode 100644
index 00000000000..646541ae9c1
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.function.Supplier;
+
+/**
+ * The transaction index maintains metadata about the aborted transactions for each segment. This includes
+ * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
+ * the abort. This index is used to find the aborted transactions in the range of a given fetch request at
+ * the READ_COMMITTED isolation level.
+ *
+ * There is at most one transaction index for each log segment. The entries correspond to the transactions
+ * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
+ * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
+ * order to find the start of the transactions.
+ */
+public class TransactionIndex implements Closeable {
+
+    private static class AbortedTxnWithPosition {
+        final AbortedTxn txn;
+        final int position;
+        AbortedTxnWithPosition(AbortedTxn txn, int position) {
+            this.txn = txn;
+            this.position = position;
+        }
+    }
+
+    private final long startOffset;
+
+    private volatile File file;
+
+    // note that the file is not created until we need it
+    private Optional<FileChannel> maybeChannel = Optional.empty();
+    private OptionalLong lastOffset = OptionalLong.empty();
+
+    public TransactionIndex(long startOffset, File file) throws IOException {
+        this.startOffset = startOffset;
+        this.file = file;
+
+        if (file.exists())
+            openChannel();
+    }
+
+    public File file() {
+        return file;
+    }
+
+    public void updateParentDir(File parentDir) {
+        this.file = new File(parentDir, file.getName());
+    }
+
+    public void append(AbortedTxn abortedTxn) throws IOException {
+        lastOffset.ifPresent(offset -> {
+            if (offset >= abortedTxn.lastOffset())
+                throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but "
+                    + abortedTxn.lastOffset() + " is not greater than current last offset " + offset + " of index "
+                    + file.getAbsolutePath());
+        });
+        lastOffset = OptionalLong.of(abortedTxn.lastOffset());
+        Utils.writeFully(channel(), abortedTxn.buffer.duplicate());
+    }
+
+    public void flush() throws IOException {
+        FileChannel channel = channelOrNull();
+        if (channel != null)
+            channel.force(true);
+    }
+
+    /**
+     * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
+     */
+    public void reset() throws IOException {
+        FileChannel channel = channelOrNull();
+        if (channel != null)
+            channel.truncate(0);
+        lastOffset = OptionalLong.empty();
+    }
+
+    public void close() throws IOException {
+        FileChannel channel = channelOrNull();
+        if (channel != null)
+            channel.close();
+        maybeChannel = Optional.empty();
+    }
+
+    /**
+     * Delete this index.
+     *
+     * @throws IOException if deletion fails due to an I/O error
+     * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
+     *         not exist
+     */
+    public boolean deleteIfExists() throws IOException {
+        close();
+        return Files.deleteIfExists(file.toPath());
+    }
+
+    public void renameTo(File f) throws IOException {
+        try {
+            if (file.exists())
+                Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
+        } finally {
+            this.file = f;
+        }
+    }
+
+    public void truncateTo(long offset) throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
+        OptionalLong newLastOffset = OptionalLong.empty();
+        for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+            AbortedTxn abortedTxn = txnWithPosition.txn;
+            long position = txnWithPosition.position;
+            if (abortedTxn.lastOffset() >= offset) {
+                channel().truncate(position);
+                lastOffset = newLastOffset;
+                return;
+            }
+            newLastOffset = OptionalLong.of(abortedTxn.lastOffset());
+        }
+    }
+
+    public List<AbortedTxn> allAbortedTxns() {
+        List<AbortedTxn> result = new ArrayList<>();
+        for (AbortedTxnWithPosition txnWithPosition : iterable())
+            result.add(txnWithPosition.txn);
+        return result;
+    }
+
+    /**
+     * Collect all aborted transactions which overlap with a given fetch range.
+     *
+     * @param fetchOffset Inclusive first offset of the fetch range
+     * @param upperBoundOffset Exclusive last offset in the fetch range
+     * @return An object containing the aborted transactions and whether the search needs to continue
+     *         into the next log segment.
+     */
+    public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBoundOffset) {
+        List<AbortedTxn> abortedTransactions = new ArrayList<>();
+        for (AbortedTxnWithPosition txnWithPosition : iterable()) {
+            AbortedTxn abortedTxn = txnWithPosition.txn;
+            if (abortedTxn.lastOffset() >= fetchOffset && abortedTxn.firstOffset() < upperBoundOffset)
+                abortedTransactions.add(abortedTxn);
+
+            if (abortedTxn.lastStableOffset() >= upperBoundOffset)
+                return new TxnIndexSearchResult(abortedTransactions, true);
+        }
+        return new TxnIndexSearchResult(abortedTransactions, false);
+    }
+
+    /**
+     * Do a basic sanity check on this index to detect obvious problems.
+     *
+     * @throws CorruptIndexException if any problems are found.
+     */
+    public void sanityCheck() {
+        ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
+        for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+            AbortedTxn abortedTxn = txnWithPosition.txn;
+            if (abortedTxn.lastOffset() < startOffset)
+                throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index "
+                    + file.getAbsolutePath() + " is less than start offset " + startOffset);
+        }
+    }
+
+    private FileChannel openChannel() throws IOException {
+        FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE,
+                StandardOpenOption.READ, StandardOpenOption.WRITE);
+        maybeChannel = Optional.of(channel);
+        channel.position(channel.size());
+        return channel;
+    }
+
+    private FileChannel channel() throws IOException {
+        FileChannel channel = channelOrNull();
+        if (channel == null)
+            return openChannel();
+        else
+            return channel;
+    }
+
+    private FileChannel channelOrNull() {
+        return maybeChannel.orElse(null);
+    }
+
+    private Iterable<AbortedTxnWithPosition> iterable() {
+        return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
+    }
+
+    private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) {
+        FileChannel channel = channelOrNull();
+        if (channel == null)
+            return Collections.emptyList();
+
+        PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
+
+        return () -> new Iterator<AbortedTxnWithPosition>() {
+
+            @Override
+            public boolean hasNext() {
+                try {
+                    return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE;
+                } catch (IOException e) {
+                    throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e);
+                }
+            }
+
+            @Override
+            public AbortedTxnWithPosition next() {
+                try {
+                    ByteBuffer buffer = allocate.get();
+                    Utils.readFully(channel, buffer, position.value);
+                    buffer.flip();
+
+                    AbortedTxn abortedTxn = new AbortedTxn(buffer);
+                    if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
+                        throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version()
+                            + " in transaction index " + file.getAbsolutePath() + ", current version is "
+                            + AbortedTxn.CURRENT_VERSION);
+                    AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value);
+                    position.value += AbortedTxn.TOTAL_SIZE;
+                    return nextEntry;
+                } catch (IOException e) {
+                    // We received an unexpected error reading from the index file. We propagate this as an
+                    // UNKNOWN error to the consumer, which will cause it to retry the fetch.
+                    throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e);
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java
new file mode 100644
index 00000000000..c1d40501af7
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TxnIndexSearchResult {
+    public final List<AbortedTxn> abortedTransactions;
+    public final boolean isComplete;
+
+    public TxnIndexSearchResult(List<AbortedTxn> abortedTransactions, boolean isComplete) {
+        this.abortedTransactions = Collections.unmodifiableList(abortedTransactions);
+        this.isComplete = isComplete;
+    }
+}