You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/12/19 19:31:45 UTC
[kafka] branch trunk updated: KAFKA-14472: Move TransactionIndex and related to storage module (#12996)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e2678d57d09 KAFKA-14472: Move TransactionIndex and related to storage module (#12996)
e2678d57d09 is described below
commit e2678d57d0919aaa97effe2ee7591cd6b85b5303
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon Dec 19 11:31:37 2022 -0800
KAFKA-14472: Move TransactionIndex and related to storage module (#12996)
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <ju...@gmail.com>, Satish Duggana <sa...@apache.org>
---
core/src/main/scala/kafka/log/LocalLog.scala | 10 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 3 +-
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
.../scala/kafka/log/ProducerStateManager.scala | 3 +-
.../main/scala/kafka/log/TransactionIndex.scala | 264 ---------------------
core/src/main/scala/kafka/log/UnifiedLog.scala | 21 +-
.../scala/kafka/log/remote/RemoteIndexCache.scala | 2 +-
.../main/scala/kafka/tools/DumpLogSegments.scala | 3 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 5 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 1 +
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 4 +-
.../test/scala/unit/kafka/log/LogTestUtils.scala | 6 +-
.../unit/kafka/log/ProducerStateManagerTest.scala | 9 +-
.../unit/kafka/log/TransactionIndexTest.scala | 80 ++++---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 1 +
.../kafka/server/log/internals/AbortedTxn.java | 117 +++++++++
.../kafka/server/log/internals/CompletedTxn.java | 75 ++++++
.../server/log/internals/TransactionIndex.java | 264 +++++++++++++++++++++
.../server/log/internals/TxnIndexSearchResult.java | 30 +++
19 files changed, 556 insertions(+), 344 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index 56172333351..68bb9d9f8b0 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.server.log.internals.OffsetPosition
+import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition}
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable}
@@ -448,7 +448,7 @@ class LocalLog(@volatile private var _dir: File,
}
val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
- def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
+ def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)
FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
@@ -459,13 +459,13 @@ class LocalLog(@volatile private var _dir: File,
private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
startingSegment: LogSegment,
- accumulator: List[AbortedTxn] => Unit): Unit = {
+ accumulator: Seq[AbortedTxn] => Unit): Unit = {
val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
var segmentEntryOpt = Option(startingSegment)
while (segmentEntryOpt.isDefined) {
val segment = segmentEntryOpt.get
val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
- accumulator(searchResult.abortedTransactions)
+ accumulator(searchResult.abortedTransactions.asScala)
if (searchResult.isComplete)
return
segmentEntryOpt = nextOption(higherSegments)
@@ -475,7 +475,7 @@ class LocalLog(@volatile private var _dir: File,
private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
val segmentEntry = segments.floorSegment(baseOffset)
val allAbortedTxns = ListBuffer.empty[AbortedTxn]
- def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
+ def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
allAbortedTxns.toList
}
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4ad0ea2d853..8bafc0aae60 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
@@ -1123,7 +1124,7 @@ private[log] class CleanedTransactionMetadata {
private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
// Minheap of aborted transactions sorted by the transaction first offset
private val abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
- override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
+ override def compare(x: AbortedTxn, y: AbortedTxn): Int = java.lang.Long.compare(x.firstOffset, y.firstOffset)
}.reverse)
// Output cleaned index to write retained aborted transactions
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4da227556f5..0c2a013e11f 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
-import org.apache.kafka.server.log.internals.{OffsetPosition, TimestampOffset}
+import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import scala.jdk.CollectionConverters._
import scala.math._
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 835b74066b6..7307bed0efa 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils}
+import org.apache.kafka.server.log.internals.CompletedTxn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
@@ -318,7 +319,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
// without any associated data will not have any impact on the last stable offset
// and would not need to be reflected in the transaction index.
val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
- CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
+ new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
}
updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
deleted file mode 100644
index 72321db31ee..00000000000
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log
-
-import java.io.{Closeable, File, IOException}
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-import java.nio.file.{Files, StandardOpenOption}
-import kafka.utils.{Logging, nonthreadsafe}
-import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.log.internals.CorruptIndexException
-
-import scala.collection.mutable.ListBuffer
-
-private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTxn], isComplete: Boolean)
-
-/**
- * The transaction index maintains metadata about the aborted transactions for each segment. This includes
- * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
- * the abort. This index is used to find the aborted transactions in the range of a given fetch request at
- * the READ_COMMITTED isolation level.
- *
- * There is at most one transaction index for each log segment. The entries correspond to the transactions
- * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
- * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
- * order to find the start of the transactions.
- */
-@nonthreadsafe
-class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Closeable with Logging {
-
- // note that the file is not created until we need it
- @volatile private var maybeChannel: Option[FileChannel] = None
- private var lastOffset: Option[Long] = None
-
- if (_file.exists)
- openChannel()
-
- def append(abortedTxn: AbortedTxn): Unit = {
- lastOffset.foreach { offset =>
- if (offset >= abortedTxn.lastOffset)
- throw new IllegalArgumentException(s"The last offset of appended transactions must increase sequentially, but " +
- s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
- }
- lastOffset = Some(abortedTxn.lastOffset)
- Utils.writeFully(channel(), abortedTxn.buffer.duplicate())
- }
-
- def flush(): Unit = maybeChannel.foreach(_.force(true))
-
- def file: File = _file
-
- def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
-
- /**
- * Delete this index.
- *
- * @throws IOException if deletion fails due to an I/O error
- * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
- * not exist
- */
- def deleteIfExists(): Boolean = {
- close()
- Files.deleteIfExists(file.toPath)
- }
-
- private def channel(): FileChannel = {
- maybeChannel match {
- case Some(channel) => channel
- case None => openChannel()
- }
- }
-
- private def openChannel(): FileChannel = {
- val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
- StandardOpenOption.WRITE)
- maybeChannel = Some(channel)
- channel.position(channel.size)
- channel
- }
-
- /**
- * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
- */
- def reset(): Unit = {
- maybeChannel.foreach(_.truncate(0))
- lastOffset = None
- }
-
- def close(): Unit = {
- maybeChannel.foreach(_.close())
- maybeChannel = None
- }
-
- def renameTo(f: File): Unit = {
- try {
- if (file.exists)
- Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
- } finally _file = f
- }
-
- def truncateTo(offset: Long): Unit = {
- val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
- var newLastOffset: Option[Long] = None
- for ((abortedTxn, position) <- iterator(() => buffer)) {
- if (abortedTxn.lastOffset >= offset) {
- channel().truncate(position)
- lastOffset = newLastOffset
- return
- }
- newLastOffset = Some(abortedTxn.lastOffset)
- }
- }
-
- private def iterator(allocate: () => ByteBuffer = () => ByteBuffer.allocate(AbortedTxn.TotalSize)): Iterator[(AbortedTxn, Int)] = {
- maybeChannel match {
- case None => Iterator.empty
- case Some(channel) =>
- var position = 0
-
- new Iterator[(AbortedTxn, Int)] {
- override def hasNext: Boolean = channel.position - position >= AbortedTxn.TotalSize
-
- override def next(): (AbortedTxn, Int) = {
- try {
- val buffer = allocate()
- Utils.readFully(channel, buffer, position)
- buffer.flip()
-
- val abortedTxn = new AbortedTxn(buffer)
- if (abortedTxn.version > AbortedTxn.CurrentVersion)
- throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version} " +
- s"in transaction index ${file.getAbsolutePath}, current version is ${AbortedTxn.CurrentVersion}")
- val nextEntry = (abortedTxn, position)
- position += AbortedTxn.TotalSize
- nextEntry
- } catch {
- case e: IOException =>
- // We received an unexpected error reading from the index file. We propagate this as an
- // UNKNOWN error to the consumer, which will cause it to retry the fetch.
- throw new KafkaException(s"Failed to read from the transaction index ${file.getAbsolutePath}", e)
- }
- }
- }
- }
- }
-
- def allAbortedTxns: List[AbortedTxn] = {
- iterator().map(_._1).toList
- }
-
- /**
- * Collect all aborted transactions which overlap with a given fetch range.
- *
- * @param fetchOffset Inclusive first offset of the fetch range
- * @param upperBoundOffset Exclusive last offset in the fetch range
- * @return An object containing the aborted transactions and whether the search needs to continue
- * into the next log segment.
- */
- def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
- val abortedTransactions = ListBuffer.empty[AbortedTxn]
- for ((abortedTxn, _) <- iterator()) {
- if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
- abortedTransactions += abortedTxn
-
- if (abortedTxn.lastStableOffset >= upperBoundOffset)
- return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
- }
- TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
- }
-
- /**
- * Do a basic sanity check on this index to detect obvious problems.
- *
- * @throws CorruptIndexException if any problems are found.
- */
- def sanityCheck(): Unit = {
- val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
- for ((abortedTxn, _) <- iterator(() => buffer)) {
- if (abortedTxn.lastOffset < startOffset)
- throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn in index " +
- s"${file.getAbsolutePath} is less than start offset $startOffset")
- }
- }
-
-}
-
-private[log] object AbortedTxn {
- val VersionOffset = 0
- val VersionSize = 2
- val ProducerIdOffset = VersionOffset + VersionSize
- val ProducerIdSize = 8
- val FirstOffsetOffset = ProducerIdOffset + ProducerIdSize
- val FirstOffsetSize = 8
- val LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize
- val LastOffsetSize = 8
- val LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize
- val LastStableOffsetSize = 8
- val TotalSize = LastStableOffsetOffset + LastStableOffsetSize
-
- val CurrentVersion: Short = 0
-}
-
-private[log] class AbortedTxn(val buffer: ByteBuffer) {
- import AbortedTxn._
-
- def this(producerId: Long,
- firstOffset: Long,
- lastOffset: Long,
- lastStableOffset: Long) = {
- this(ByteBuffer.allocate(AbortedTxn.TotalSize))
- buffer.putShort(CurrentVersion)
- buffer.putLong(producerId)
- buffer.putLong(firstOffset)
- buffer.putLong(lastOffset)
- buffer.putLong(lastStableOffset)
- buffer.flip()
- }
-
- def this(completedTxn: CompletedTxn, lastStableOffset: Long) =
- this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset)
-
- def version: Short = buffer.get(VersionOffset)
-
- def producerId: Long = buffer.getLong(ProducerIdOffset)
-
- def firstOffset: Long = buffer.getLong(FirstOffsetOffset)
-
- def lastOffset: Long = buffer.getLong(LastOffsetOffset)
-
- def lastStableOffset: Long = buffer.getLong(LastStableOffsetOffset)
-
- def asAbortedTransaction: FetchResponseData.AbortedTransaction = new FetchResponseData.AbortedTransaction()
- .setProducerId(producerId)
- .setFirstOffset(firstOffset)
-
- override def toString: String =
- s"AbortedTxn(version=$version, producerId=$producerId, firstOffset=$firstOffset, " +
- s"lastOffset=$lastOffset, lastStableOffset=$lastStableOffset)"
-
- override def equals(any: Any): Boolean = {
- any match {
- case that: AbortedTxn => this.buffer.equals(that.buffer)
- case _ => false
- }
- }
-
- override def hashCode(): Int = buffer.hashCode
-}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 8830258c8fe..e1f49cceb6e 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
+import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import scala.annotation.nowarn
@@ -159,26 +160,6 @@ case class LogReadInfo(fetchedData: FetchDataInfo,
logEndOffset: Long,
lastStableOffset: Long)
-/**
- * A class used to hold useful metadata about a completed transaction. This is used to build
- * the transaction index after appending to the log.
- *
- * @param producerId The ID of the producer
- * @param firstOffset The first offset (inclusive) of the transaction
- * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
- * COMMIT/ABORT control record which indicates the transaction's completion.
- * @param isAborted Whether or not the transaction was aborted
- */
-case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) {
- override def toString: String = {
- "CompletedTxn(" +
- s"producerId=$producerId, " +
- s"firstOffset=$firstOffset, " +
- s"lastOffset=$lastOffset, " +
- s"isAborted=$isAborted)"
- }
-}
-
/**
* A class used to hold params required to decide to rotate a log segment or not.
*/
diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
index 9dfaaa5539c..594b74f19ff 100644
--- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
+++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
@@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.log.internals.OffsetPosition
+import org.apache.kafka.server.log.internals.{OffsetPosition, TransactionIndex}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index c82523eff4b..fe8bfd98d41 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
+import org.apache.kafka.server.log.internals.TransactionIndex
import org.apache.kafka.snapshot.Snapshots
import scala.jdk.CollectionConverters._
@@ -94,7 +95,7 @@ object DumpLogSegments {
private def dumpTxnIndex(file: File): Unit = {
val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)
- for (abortedTxn <- index.allAbortedTxns) {
+ for (abortedTxn <- index.allAbortedTxns.asScala) {
println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 819278bd280..071e8b8fd1c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.log.internals.AbortedTxn
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@@ -328,8 +329,8 @@ class LogCleanerTest {
assertEquals(20L, log.logEndOffset)
val expectedAbortedTxns = List(
- new AbortedTxn(producerId=producerId1, firstOffset=8, lastOffset=10, lastStableOffset=11),
- new AbortedTxn(producerId=producerId2, firstOffset=11, lastOffset=16, lastStableOffset=17)
+ new AbortedTxn(producerId1, 8, 10, 11),
+ new AbortedTxn(producerId2, 11, 16, 17)
)
assertAllTransactionsComplete(log)
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index ff7cc00ffe9..269dff643b5 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
+import org.apache.kafka.server.log.internals.AbortedTxn
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index c31099fa928..c1776a4344c 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -346,7 +346,7 @@ class LogSegmentTest {
var abortedTxns = segment.txnIndex.allAbortedTxns
assertEquals(1, abortedTxns.size)
- var abortedTxn = abortedTxns.head
+ var abortedTxn = abortedTxns.get(0)
assertEquals(pid2, abortedTxn.producerId)
assertEquals(102L, abortedTxn.firstOffset)
assertEquals(106L, abortedTxn.lastOffset)
@@ -362,7 +362,7 @@ class LogSegmentTest {
abortedTxns = segment.txnIndex.allAbortedTxns
assertEquals(1, abortedTxns.size)
- abortedTxn = abortedTxns.head
+ abortedTxn = abortedTxns.get(0)
assertEquals(pid2, abortedTxn.producerId)
assertEquals(75L, abortedTxn.firstOffset)
assertEquals(106L, abortedTxn.lastOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 8a73c8bb943..a293ed9eac9 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -21,7 +21,6 @@ import kafka.log.remote.RemoteLogManager
import java.io.File
import java.util.Properties
-
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel}
import kafka.utils.{Scheduler, TestUtils}
@@ -29,10 +28,11 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
+
import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
-
import kafka.log
+import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}
import scala.collection.Iterable
import scala.jdk.CollectionConverters._
@@ -237,7 +237,7 @@ object LogTestUtils {
log.read(startOffset, maxLength, isolation, minOneMessage)
}
- def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
+ def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns.asScala)
def deleteProducerSnapshotFiles(logDir: File): Unit = {
val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix))
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index b631b642ef4..c808d03a72e 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.apache.kafka.server.log.internals.CompletedTxn
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{mock, when}
@@ -246,7 +247,7 @@ class ProducerStateManagerTest {
// incomplete transaction
val secondAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, secondAppend)
- assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)), firstCompletedTxn)
+ assertEquals(Some(new CompletedTxn(producerId, 16L, 21, false)), firstCompletedTxn)
assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, secondAppend))
assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend))
appendData(24L, 27L, secondAppend)
@@ -392,21 +393,21 @@ class ProducerStateManagerTest {
beginTxn(producerId3, startOffset3)
val lastOffset1 = startOffset3 + 15
- val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false)
+ val completedTxn1 = new CompletedTxn(producerId1, startOffset1, lastOffset1, false)
assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1))
stateManager.completeTxn(completedTxn1)
stateManager.onHighWatermarkUpdated(lastOffset1 + 1)
assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
val lastOffset3 = lastOffset1 + 20
- val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false)
+ val completedTxn3 = new CompletedTxn(producerId3, startOffset3, lastOffset3, false)
assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3))
stateManager.completeTxn(completedTxn3)
stateManager.onHighWatermarkUpdated(lastOffset3 + 1)
assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
val lastOffset2 = lastOffset3 + 78
- val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false)
+ val completedTxn2 = new CompletedTxn(producerId2, startOffset2, lastOffset2, false)
assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2))
stateManager.completeTxn(completedTxn2)
stateManager.onHighWatermarkUpdated(lastOffset2 + 1)
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 9c1cbe3f307..6784e76a8e5 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -18,11 +18,13 @@ package kafka.log
import kafka.utils.TestUtils
import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.server.log.internals.CorruptIndexException
+import org.apache.kafka.server.log.internals.{AbortedTxn, CorruptIndexException, TransactionIndex}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import scala.jdk.CollectionConverters._
import java.io.File
+import java.util.Collections
class TransactionIndexTest {
var file: File = _
@@ -43,26 +45,26 @@ class TransactionIndexTest {
@Test
def testPositionSetCorrectlyWhenOpened(): Unit = {
val abortedTxns = List(
- new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
- new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
- new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
- new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+ new AbortedTxn(0L, 0, 10, 11),
+ new AbortedTxn(1L, 5, 15, 13),
+ new AbortedTxn(2L, 18, 35, 25),
+ new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(index.append)
index.close()
val reopenedIndex = new TransactionIndex(0L, file)
- val anotherAbortedTxn = new AbortedTxn(producerId = 3L, firstOffset = 50, lastOffset = 60, lastStableOffset = 55)
+ val anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55)
reopenedIndex.append(anotherAbortedTxn)
- assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns)
+ assertEquals((abortedTxns ++ List(anotherAbortedTxn)).asJava, reopenedIndex.allAbortedTxns)
}
@Test
def testSanityCheck(): Unit = {
val abortedTxns = List(
- new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
- new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
- new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
- new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+ new AbortedTxn(0L, 0, 10, 11),
+ new AbortedTxn(1L, 5, 15, 13),
+ new AbortedTxn(2L, 18, 35, 25),
+ new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(index.append)
index.close()
@@ -73,71 +75,71 @@ class TransactionIndexTest {
@Test
def testLastOffsetMustIncrease(): Unit = {
- index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
- assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0,
- lastOffset = 15, lastStableOffset = 11)))
+ index.append(new AbortedTxn(1L, 5, 15, 13))
+ assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
+ 15, 11)))
}
@Test
def testLastOffsetCannotDecrease(): Unit = {
- index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
- assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0,
- lastOffset = 10, lastStableOffset = 11)))
+ index.append(new AbortedTxn(1L, 5, 15, 13))
+ assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
+ 10, 11)))
}
@Test
def testCollectAbortedTransactions(): Unit = {
val abortedTransactions = List(
- new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
- new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
- new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
- new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+ new AbortedTxn(0L, 0, 10, 11),
+ new AbortedTxn(1L, 5, 15, 13),
+ new AbortedTxn(2L, 18, 35, 25),
+ new AbortedTxn(3L, 32, 50, 40))
abortedTransactions.foreach(index.append)
var result = index.collectAbortedTxns(0L, 100L)
- assertEquals(abortedTransactions, result.abortedTransactions)
+ assertEquals(abortedTransactions.asJava, result.abortedTransactions)
assertFalse(result.isComplete)
result = index.collectAbortedTxns(0L, 32)
- assertEquals(abortedTransactions.take(3), result.abortedTransactions)
+ assertEquals(abortedTransactions.take(3).asJava, result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(0L, 35)
- assertEquals(abortedTransactions, result.abortedTransactions)
+ assertEquals(abortedTransactions.asJava, result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(10, 35)
- assertEquals(abortedTransactions, result.abortedTransactions)
+ assertEquals(abortedTransactions.asJava, result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(11, 35)
- assertEquals(abortedTransactions.slice(1, 4), result.abortedTransactions)
+ assertEquals(abortedTransactions.slice(1, 4).asJava, result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(20, 41)
- assertEquals(abortedTransactions.slice(2, 4), result.abortedTransactions)
+ assertEquals(abortedTransactions.slice(2, 4).asJava, result.abortedTransactions)
assertFalse(result.isComplete)
}
@Test
def testTruncate(): Unit = {
val abortedTransactions = List(
- new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2),
- new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16),
- new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
- new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+ new AbortedTxn(0L, 0, 10, 2),
+ new AbortedTxn(1L, 5, 15, 16),
+ new AbortedTxn(2L, 18, 35, 25),
+ new AbortedTxn(3L, 32, 50, 40))
abortedTransactions.foreach(index.append)
index.truncateTo(51)
- assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions)
+ assertEquals(abortedTransactions.asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
index.truncateTo(50)
- assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions)
+ assertEquals(abortedTransactions.take(3).asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
index.reset()
- assertEquals(List.empty[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
+ assertEquals(Collections.emptyList[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
}
@Test
@@ -148,7 +150,7 @@ class TransactionIndexTest {
val lastStableOffset = 200L
val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset)
- assertEquals(AbortedTxn.CurrentVersion, abortedTxn.version)
+ assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version)
assertEquals(pid, abortedTxn.producerId)
assertEquals(firstOffset, abortedTxn.firstOffset)
assertEquals(lastOffset, abortedTxn.lastOffset)
@@ -158,15 +160,15 @@ class TransactionIndexTest {
@Test
def testRenameIndex(): Unit = {
val renamed = TestUtils.tempFile()
- index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2))
+ index.append(new AbortedTxn(0L, 0, 10, 2))
index.renameTo(renamed)
- index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16))
+ index.append(new AbortedTxn(1L, 5, 15, 16))
val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions
assertEquals(2, abortedTxns.size)
- assertEquals(0, abortedTxns(0).firstOffset)
- assertEquals(5, abortedTxns(1).firstOffset)
+ assertEquals(0, abortedTxns.get(0).firstOffset)
+ assertEquals(5, abortedTxns.get(1).firstOffset)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 42fdafae206..45335eec8ec 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
+import org.apache.kafka.server.log.internals.AbortedTxn
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java b/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java
new file mode 100644
index 00000000000..2ad08ce77ba
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class AbortedTxn {
+ static final int VERSION_OFFSET = 0;
+ static final int VERSION_SIZE = 2;
+ static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE;
+ static final int PRODUCER_ID_SIZE = 8;
+ static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_SIZE;
+ static final int FIRST_OFFSET_SIZE = 8;
+ static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET + FIRST_OFFSET_SIZE;
+ static final int LAST_OFFSET_SIZE = 8;
+ static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET + LAST_OFFSET_SIZE;
+ static final int LAST_STABLE_OFFSET_SIZE = 8;
+ static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET + LAST_STABLE_OFFSET_SIZE;
+
+ public static final short CURRENT_VERSION = 0;
+
+ final ByteBuffer buffer;
+
+ AbortedTxn(ByteBuffer buffer) {
+ Objects.requireNonNull(buffer);
+ this.buffer = buffer;
+ }
+
+ public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) {
+ this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset);
+ }
+
+ public AbortedTxn(long producerId, long firstOffset, long lastOffset, long lastStableOffset) {
+ this(toByteBuffer(producerId, firstOffset, lastOffset, lastStableOffset));
+ }
+
+ private static ByteBuffer toByteBuffer(long producerId, long firstOffset, long lastOffset, long lastStableOffset) {
+ ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE);
+ buffer.putShort(CURRENT_VERSION);
+ buffer.putLong(producerId);
+ buffer.putLong(firstOffset);
+ buffer.putLong(lastOffset);
+ buffer.putLong(lastStableOffset);
+ buffer.flip();
+ return buffer;
+ }
+
+ public short version() {
+ return buffer.get(VERSION_OFFSET);
+ }
+
+ public long producerId() {
+ return buffer.getLong(PRODUCER_ID_OFFSET);
+ }
+
+ public long firstOffset() {
+ return buffer.getLong(FIRST_OFFSET_OFFSET);
+ }
+
+ public long lastOffset() {
+ return buffer.getLong(LAST_OFFSET_OFFSET);
+ }
+
+ public long lastStableOffset() {
+ return buffer.getLong(LAST_STABLE_OFFSET_OFFSET);
+ }
+
+ public FetchResponseData.AbortedTransaction asAbortedTransaction() {
+ return new FetchResponseData.AbortedTransaction()
+ .setProducerId(producerId())
+ .setFirstOffset(firstOffset());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ AbortedTxn that = (AbortedTxn) o;
+ return buffer.equals(that.buffer);
+ }
+
+ @Override
+ public int hashCode() {
+ return buffer.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "AbortedTxn(version=" + version()
+ + ", producerId=" + producerId()
+ + ", firstOffset=" + firstOffset()
+ + ", lastOffset=" + lastOffset()
+ + ", lastStableOffset=" + lastStableOffset()
+ + ")";
+ }
+
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java b/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java
new file mode 100644
index 00000000000..1ad6f8854b4
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+/**
+ * A class used to hold useful metadata about a completed transaction. This is used to build
+ * the transaction index after appending to the log.
+ */
+public class CompletedTxn {
+ public final long producerId;
+ public final long firstOffset;
+ public final long lastOffset;
+ public final boolean isAborted;
+
+ /**
+ * Create an instance of this class.
+ *
+ * @param producerId The ID of the producer
+ * @param firstOffset The first offset (inclusive) of the transaction
+ * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
+ * COMMIT/ABORT control record which indicates the transaction's completion.
+ * @param isAborted Whether the transaction was aborted
+ */
+ public CompletedTxn(long producerId, long firstOffset, long lastOffset, boolean isAborted) {
+ this.producerId = producerId;
+ this.firstOffset = firstOffset;
+ this.lastOffset = lastOffset;
+ this.isAborted = isAborted;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CompletedTxn that = (CompletedTxn) o;
+
+ return producerId == that.producerId
+ && firstOffset == that.firstOffset
+ && lastOffset == that.lastOffset
+ && isAborted == that.isAborted;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Long.hashCode(producerId);
+ result = 31 * result + Long.hashCode(firstOffset);
+ result = 31 * result + Long.hashCode(lastOffset);
+ result = 31 * result + Boolean.hashCode(isAborted);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "CompletedTxn(producerId=" + producerId +
+ ", firstOffset=" + firstOffset +
+ ", lastOffset=" + lastOffset +
+ ", isAborted=" + isAborted +
+ ')';
+ }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java
new file mode 100644
index 00000000000..646541ae9c1
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.function.Supplier;
+
+/**
+ * The transaction index maintains metadata about the aborted transactions for each segment. This includes
+ * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
+ * the abort. This index is used to find the aborted transactions in the range of a given fetch request at
+ * the READ_COMMITTED isolation level.
+ *
+ * There is at most one transaction index for each log segment. The entries correspond to the transactions
+ * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
+ * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
+ * order to find the start of the transactions.
+ */
+public class TransactionIndex implements Closeable {
+
+ private static class AbortedTxnWithPosition {
+ final AbortedTxn txn;
+ final int position;
+ AbortedTxnWithPosition(AbortedTxn txn, int position) {
+ this.txn = txn;
+ this.position = position;
+ }
+ }
+
+ private final long startOffset;
+
+ private volatile File file;
+
+ // note that the file is not created until we need it
+ private Optional<FileChannel> maybeChannel = Optional.empty();
+ private OptionalLong lastOffset = OptionalLong.empty();
+
+ public TransactionIndex(long startOffset, File file) throws IOException {
+ this.startOffset = startOffset;
+ this.file = file;
+
+ if (file.exists())
+ openChannel();
+ }
+
+ public File file() {
+ return file;
+ }
+
+ public void updateParentDir(File parentDir) {
+ this.file = new File(parentDir, file.getName());
+ }
+
+ public void append(AbortedTxn abortedTxn) throws IOException {
+ lastOffset.ifPresent(offset -> {
+ if (offset >= abortedTxn.lastOffset())
+ throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but "
+ + abortedTxn.lastOffset() + " is not greater than current last offset " + offset + " of index "
+ + file.getAbsolutePath());
+ });
+ lastOffset = OptionalLong.of(abortedTxn.lastOffset());
+ Utils.writeFully(channel(), abortedTxn.buffer.duplicate());
+ }
+
+ public void flush() throws IOException {
+ FileChannel channel = channelOrNull();
+ if (channel != null)
+ channel.force(true);
+ }
+
+ /**
+ * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
+ */
+ public void reset() throws IOException {
+ FileChannel channel = channelOrNull();
+ if (channel != null)
+ channel.truncate(0);
+ lastOffset = OptionalLong.empty();
+ }
+
+ public void close() throws IOException {
+ FileChannel channel = channelOrNull();
+ if (channel != null)
+ channel.close();
+ maybeChannel = Optional.empty();
+ }
+
+ /**
+ * Delete this index.
+ *
+ * @throws IOException if deletion fails due to an I/O error
+ * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
+ * not exist
+ */
+ public boolean deleteIfExists() throws IOException {
+ close();
+ return Files.deleteIfExists(file.toPath());
+ }
+
+ public void renameTo(File f) throws IOException {
+ try {
+ if (file.exists())
+ Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
+ } finally {
+ this.file = f;
+ }
+ }
+
+ public void truncateTo(long offset) throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
+ OptionalLong newLastOffset = OptionalLong.empty();
+ for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+ AbortedTxn abortedTxn = txnWithPosition.txn;
+ long position = txnWithPosition.position;
+ if (abortedTxn.lastOffset() >= offset) {
+ channel().truncate(position);
+ lastOffset = newLastOffset;
+ return;
+ }
+ newLastOffset = OptionalLong.of(abortedTxn.lastOffset());
+ }
+ }
+
+ public List<AbortedTxn> allAbortedTxns() {
+ List<AbortedTxn> result = new ArrayList<>();
+ for (AbortedTxnWithPosition txnWithPosition : iterable())
+ result.add(txnWithPosition.txn);
+ return result;
+ }
+
+ /**
+ * Collect all aborted transactions which overlap with a given fetch range.
+ *
+ * @param fetchOffset Inclusive first offset of the fetch range
+ * @param upperBoundOffset Exclusive last offset in the fetch range
+ * @return An object containing the aborted transactions and whether the search needs to continue
+ * into the next log segment.
+ */
+ public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBoundOffset) {
+ List<AbortedTxn> abortedTransactions = new ArrayList<>();
+ for (AbortedTxnWithPosition txnWithPosition : iterable()) {
+ AbortedTxn abortedTxn = txnWithPosition.txn;
+ if (abortedTxn.lastOffset() >= fetchOffset && abortedTxn.firstOffset() < upperBoundOffset)
+ abortedTransactions.add(abortedTxn);
+
+ if (abortedTxn.lastStableOffset() >= upperBoundOffset)
+ return new TxnIndexSearchResult(abortedTransactions, true);
+ }
+ return new TxnIndexSearchResult(abortedTransactions, false);
+ }
+
+ /**
+ * Do a basic sanity check on this index to detect obvious problems.
+ *
+ * @throws CorruptIndexException if any problems are found.
+ */
+ public void sanityCheck() {
+ ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
+ for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+ AbortedTxn abortedTxn = txnWithPosition.txn;
+ if (abortedTxn.lastOffset() < startOffset)
+ throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index "
+ + file.getAbsolutePath() + " is less than start offset " + startOffset);
+ }
+ }
+
+ private FileChannel openChannel() throws IOException {
+ FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE,
+ StandardOpenOption.READ, StandardOpenOption.WRITE);
+ maybeChannel = Optional.of(channel);
+ channel.position(channel.size());
+ return channel;
+ }
+
+ private FileChannel channel() throws IOException {
+ FileChannel channel = channelOrNull();
+ if (channel == null)
+ return openChannel();
+ else
+ return channel;
+ }
+
+ private FileChannel channelOrNull() {
+ return maybeChannel.orElse(null);
+ }
+
+ private Iterable<AbortedTxnWithPosition> iterable() {
+ return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
+ }
+
+ private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) {
+ FileChannel channel = channelOrNull();
+ if (channel == null)
+ return Collections.emptyList();
+
+ PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
+
+ return () -> new Iterator<AbortedTxnWithPosition>() {
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE;
+ } catch (IOException e) {
+ throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public AbortedTxnWithPosition next() {
+ try {
+ ByteBuffer buffer = allocate.get();
+ Utils.readFully(channel, buffer, position.value);
+ buffer.flip();
+
+ AbortedTxn abortedTxn = new AbortedTxn(buffer);
+ if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
+ throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version()
+ + " in transaction index " + file.getAbsolutePath() + ", current version is "
+ + AbortedTxn.CURRENT_VERSION);
+ AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value);
+ position.value += AbortedTxn.TOTAL_SIZE;
+ return nextEntry;
+ } catch (IOException e) {
+ // We received an unexpected error reading from the index file. We propagate this as an
+ // UNKNOWN error to the consumer, which will cause it to retry the fetch.
+ throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e);
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java
new file mode 100644
index 00000000000..c1d40501af7
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TxnIndexSearchResult {
+ public final List<AbortedTxn> abortedTransactions;
+ public final boolean isComplete;
+
+ public TxnIndexSearchResult(List<AbortedTxn> abortedTransactions, boolean isComplete) {
+ this.abortedTransactions = Collections.unmodifiableList(abortedTransactions);
+ this.isComplete = isComplete;
+ }
+}