You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/12/28 01:12:15 UTC
[kafka] branch trunk updated: KAFKA-14543: Move LogOffsetMetadata to storage module (#13038)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8184ada6a5c KAFKA-14543: Move LogOffsetMetadata to storage module (#13038)
8184ada6a5c is described below
commit 8184ada6a5c98cd637e8d323c56dbe6b20d582bd
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Dec 28 02:12:02 2022 +0100
KAFKA-14543: Move LogOffsetMetadata to storage module (#13038)
Reviewers: Ismael Juma <is...@juma.me.uk>, dengziming <de...@gmail.com>, Satish Duggana <sa...@apache.org>, Federico Valeri <fe...@gmail.com>
---
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
core/src/main/scala/kafka/cluster/Replica.scala | 6 +-
core/src/main/scala/kafka/log/LocalLog.scala | 6 +-
core/src/main/scala/kafka/log/LogLoader.scala | 5 +-
core/src/main/scala/kafka/log/LogSegment.scala | 6 +-
.../scala/kafka/log/ProducerStateManager.scala | 8 +-
core/src/main/scala/kafka/log/UnifiedLog.scala | 34 +++----
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 2 +-
.../src/main/scala/kafka/server/DelayedFetch.scala | 3 +-
.../main/scala/kafka/server/FetchDataInfo.scala | 3 +-
.../scala/kafka/server/LogOffsetMetadata.scala | 84 -----------------
.../main/scala/kafka/server/ReplicaManager.scala | 10 +-
.../kafka/server/DelayedFetchTest.scala | 11 ++-
.../scala/unit/kafka/cluster/ReplicaTest.scala | 4 +-
.../group/GroupMetadataManagerTest.scala | 12 +--
.../TransactionCoordinatorConcurrencyTest.scala | 5 +-
.../transaction/TransactionStateManagerTest.scala | 12 +--
.../test/scala/unit/kafka/log/LocalLogTest.scala | 4 +-
.../unit/kafka/log/ProducerStateManagerTest.scala | 31 +++----
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 14 +--
.../kafka/server/AbstractFetcherThreadTest.scala | 3 +-
.../unit/kafka/server/IsrExpirationTest.scala | 14 +--
.../kafka/server/ReplicaManagerQuotasTest.scala | 24 ++---
.../unit/kafka/server/ReplicaManagerTest.scala | 4 +-
.../UpdateFollowerFetchStateBenchmark.java | 2 +-
.../server/log/internals/LogOffsetMetadata.java | 103 +++++++++++++++++++++
26 files changed, 217 insertions(+), 195 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c4df79219a4..fe8792189ba 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.log.internals.AppendOrigin
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 0321488af4d..7a60632a54b 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,9 +18,9 @@
package kafka.cluster
import kafka.log.UnifiedLog
-import kafka.server.LogOffsetMetadata
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
import java.util.concurrent.atomic.AtomicReference
@@ -69,7 +69,7 @@ case class ReplicaState(
object ReplicaState {
val Empty: ReplicaState = ReplicaState(
- logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
+ logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
logStartOffset = UnifiedLog.UnknownOffset,
lastFetchLeaderLogEndOffset = 0L,
lastFetchTimeMs = 0L,
@@ -139,7 +139,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
if (isNewLeader) {
ReplicaState(
logStartOffset = UnifiedLog.UnknownOffset,
- logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
+ logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
lastFetchTimeMs = 0L,
lastCaughtUpTimeMs = lastCaughtUpTimeMs
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index 10d5217069b..b62fead3b91 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -23,14 +23,14 @@ import java.text.NumberFormat
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{FetchDataInfo, LogOffsetMetadata}
+import kafka.server.FetchDataInfo
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetPosition}
+import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable}
@@ -200,7 +200,7 @@ class LocalLog(@volatile private var _dir: File,
* @param endOffset the new end offset of the log
*/
private[log] def updateLogEndOffset(endOffset: Long): Unit = {
- nextOffsetMetadata = LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
+ nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
if (recoveryPoint > endOffset) {
updateRecoveryPoint(endOffset)
}
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index e69283f71a6..59ea5a8c1f6 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -21,14 +21,13 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import kafka.common.LogSegmentOffsetOverflowException
import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
-import kafka.server.LogOffsetMetadata
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.snapshot.Snapshots
-import org.apache.kafka.server.log.internals.{CorruptIndexException, LogDirFailureChannel}
+import org.apache.kafka.server.log.internals.{CorruptIndexException, LogDirFailureChannel, LogOffsetMetadata}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.{Set, mutable}
@@ -209,7 +208,7 @@ class LogLoader(
LoadedLogOffsets(
newLogStartOffset,
newRecoveryPoint,
- LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size))
+ new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size))
}
/**
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 93677bee653..16408f21394 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -25,14 +25,14 @@ import java.util.concurrent.TimeUnit
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.KafkaMetricsGroup
import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{FetchDataInfo, LogOffsetMetadata}
+import kafka.server.FetchDataInfo
import kafka.utils._
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
-import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
+import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import java.util.Optional
import scala.jdk.CollectionConverters._
@@ -305,7 +305,7 @@ class LogSegment private[log] (val log: FileRecords,
return null
val startPosition = startOffsetAndSize.position
- val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
+ val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 1b9c4593d02..c6fe0486448 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -22,7 +22,7 @@ import java.nio.channels.FileChannel
import java.nio.file.{Files, NoSuchFileException, StandardOpenOption}
import java.util.concurrent.ConcurrentSkipListMap
import kafka.log.UnifiedLog.offsetFromFile
-import kafka.server.{BrokerReconfigurable, KafkaConfig, LogOffsetMetadata}
+import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils.{CoreUtils, Logging, nonthreadsafe, threadsafe}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -30,7 +30,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils}
-import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn}
+import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
@@ -50,7 +50,7 @@ private[log] case class TxnMetadata(
firstOffset: LogOffsetMetadata,
var lastOffset: Option[Long] = None
) {
- def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
+ def this(producerId: Long, firstOffset: Long) = this(producerId, new LogOffsetMetadata(firstOffset))
override def toString: String = {
"TxnMetadata(" +
@@ -259,7 +259,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
None
}
} else {
- val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(LogOffsetMetadata(batch.baseOffset))
+ val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(new LogOffsetMetadata(batch.baseOffset))
appendDataBatch(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
None
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index c876840182b..0bab5fa7018 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -28,7 +28,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
+import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
-import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogDirFailureChannel, LogValidator}
+import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogDirFailureChannel, LogOffsetMetadata, LogValidator}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
@@ -273,7 +273,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* equals the log end offset (which may never happen for a partition under consistent load). This is needed to
* prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
*/
- @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
+ @volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(logStartOffset)
@volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
@@ -324,7 +324,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
try partMetadataFile.delete()
catch {
case e: IOException =>
- error(s"Error while trying to delete partition metadata file ${partMetadataFile}", e)
+ error(s"Error while trying to delete partition metadata file $partMetadataFile", e)
}
}
} else if (keepPartitionMetadataFile) {
@@ -380,7 +380,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* @return the updated high watermark offset
*/
def updateHighWatermark(hw: Long): Long = {
- updateHighWatermark(LogOffsetMetadata(hw))
+ updateHighWatermark(new LogOffsetMetadata(hw))
}
/**
@@ -393,7 +393,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
val endOffsetMetadata = localLog.logEndOffsetMetadata
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) {
- LogOffsetMetadata(logStartOffset)
+ new LogOffsetMetadata(logStartOffset)
} else if (highWatermarkMetadata.messageOffset >= endOffsetMetadata.messageOffset) {
endOffsetMetadata
} else {
@@ -445,7 +445,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
def maybeUpdateHighWatermark(hw: Long): Option[Long] = {
lock.synchronized {
val oldHighWatermark = highWatermarkMetadata
- updateHighWatermark(LogOffsetMetadata(hw)) match {
+ updateHighWatermark(new LogOffsetMetadata(hw)) match {
case oldHighWatermark.messageOffset =>
None
case newHighWatermark =>
@@ -838,7 +838,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
- appendInfo.firstOffset = Some(LogOffsetMetadata(offset.value))
+ appendInfo.firstOffset = Some(new LogOffsetMetadata(offset.value))
val validateAndOffsetAssignResult = try {
val validator = new LogValidator(validRecords,
topicPartition,
@@ -935,10 +935,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// maybe roll the log if this segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
- val logOffsetMetadata = LogOffsetMetadata(
- messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
- segmentBaseOffset = segment.baseOffset,
- relativePositionInSegment = segment.size)
+ val logOffsetMetadata = new LogOffsetMetadata(
+ appendInfo.firstOrLastOffsetOfFirstBatch,
+ segment.baseOffset,
+ segment.size)
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
@@ -947,14 +947,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
maybeDuplicate match {
case Some(duplicate) =>
- appendInfo.firstOffset = Some(LogOffsetMetadata(duplicate.firstOffset))
+ appendInfo.firstOffset = Some(new LogOffsetMetadata(duplicate.firstOffset))
appendInfo.lastOffset = duplicate.lastOffset
appendInfo.logAppendTime = duplicate.timestamp
appendInfo.logStartOffset = logStartOffset
case None =>
// Before appending update the first offset metadata to include segment information
appendInfo.firstOffset = appendInfo.firstOffset.map { offsetMetadata =>
- offsetMetadata.copy(segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size)
+ new LogOffsetMetadata(offsetMetadata.messageOffset, segment.baseOffset, segment.size)
}
// Append the records, and increment the local log end offset immediately after the append because a
@@ -1091,7 +1091,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// We cache offset metadata for the start of each transaction. This allows us to
// compute the last stable offset without relying on additional index lookups.
val firstOffsetMetadata = if (batch.isTransactional)
- Some(LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
+ Some(new LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
else
None
@@ -1155,7 +1155,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// Also indicate whether we have the accurate first offset or not
if (!readFirstMessage) {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
- firstOffset = Some(LogOffsetMetadata(batch.baseOffset))
+ firstOffset = Some(new LogOffsetMetadata(batch.baseOffset))
lastOffsetOfFirstBatch = batch.lastOffset
readFirstMessage = true
}
@@ -1328,7 +1328,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
val remoteOffset = if (remoteLogEnabled()) {
if (remoteLogManager.isEmpty) {
- throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.");
+ throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.")
}
if (recordVersion.value < RecordVersion.V2.value) {
throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.")
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 7056f5db0e6..a892d9c235b 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -190,7 +190,7 @@ final class KafkaMetadataLog private (
override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {
offsetMetadata.metadata.asScala match {
case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark(
- new kafka.server.LogOffsetMetadata(
+ new org.apache.kafka.server.log.internals.LogOffsetMetadata(
offsetMetadata.offset,
segmentPosition.baseOffset,
segmentPosition.relativePosition)
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 55a15682b64..b61bd8bc8a0 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
import scala.collection._
@@ -75,7 +76,7 @@ class DelayedFetch(
val fetchOffset = fetchStatus.startOffsetMetadata
val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
try {
- if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
+ if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
index 95b68c08395..f3b0c40ea3b 100644
--- a/core/src/main/scala/kafka/server/FetchDataInfo.scala
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
sealed trait FetchIsolation
case object FetchLogEnd extends FetchIsolation
@@ -78,7 +79,7 @@ case class FetchParams(
object FetchDataInfo {
def empty(fetchOffset: Long): FetchDataInfo = {
FetchDataInfo(
- fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
+ fetchOffsetMetadata = new LogOffsetMetadata(fetchOffset),
records = MemoryRecords.EMPTY,
firstEntryIncomplete = false,
abortedTransactions = None
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
deleted file mode 100644
index 94002607af0..00000000000
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.log.UnifiedLog
-import org.apache.kafka.common.KafkaException
-
-object LogOffsetMetadata {
- val UnknownOffsetMetadata = LogOffsetMetadata(-1, 0, 0)
- val UnknownFilePosition = -1
-
- class OffsetOrdering extends Ordering[LogOffsetMetadata] {
- override def compare(x: LogOffsetMetadata, y: LogOffsetMetadata): Int = {
- x.offsetDiff(y).toInt
- }
- }
-
-}
-
-/*
- * A log offset structure, including:
- * 1. the message offset
- * 2. the base message offset of the located segment
- * 3. the physical position on the located segment
- */
-case class LogOffsetMetadata(messageOffset: Long,
- segmentBaseOffset: Long = UnifiedLog.UnknownOffset,
- relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
-
- // check if this offset is already on an older segment compared with the given offset
- def onOlderSegment(that: LogOffsetMetadata): Boolean = {
- if (messageOffsetOnly)
- throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
-
- this.segmentBaseOffset < that.segmentBaseOffset
- }
-
- // check if this offset is on the same segment with the given offset
- def onSameSegment(that: LogOffsetMetadata): Boolean = {
- if (messageOffsetOnly)
- throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
-
- this.segmentBaseOffset == that.segmentBaseOffset
- }
-
- // compute the number of messages between this offset to the given offset
- def offsetDiff(that: LogOffsetMetadata): Long = {
- this.messageOffset - that.messageOffset
- }
-
- // compute the number of bytes between this offset to the given offset
- // if they are on the same segment and this offset precedes the given offset
- def positionDiff(that: LogOffsetMetadata): Int = {
- if(!onSameSegment(that))
- throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
- if(messageOffsetOnly)
- throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")
-
- this.relativePositionInSegment - that.relativePositionInSegment
- }
-
- // decide if the offset metadata only contains message offset info
- def messageOffsetOnly: Boolean = {
- segmentBaseOffset == UnifiedLog.UnknownOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
- }
-
- override def toString = s"(offset=$messageOffset segment=[$segmentBaseOffset:$relativePositionInSegment])"
-
-}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c88cc688693..f6493ad7066 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -61,7 +61,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, RecordValidationException}
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException}
import java.nio.file.{Files, Paths}
import java.util
@@ -128,7 +128,7 @@ case class LogReadResult(info: FetchDataInfo,
isReassignmentFetch)
def withEmptyFetchInfo: LogReadResult =
- copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
+ copy(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY))
override def toString = {
"LogReadResult(" +
@@ -1125,7 +1125,7 @@ class ReplicaManager(val config: KafkaConfig,
}
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
- LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+ LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
@@ -1180,7 +1180,7 @@ class ReplicaManager(val config: KafkaConfig,
_: KafkaStorageException |
_: OffsetOutOfRangeException |
_: InconsistentTopicIdException) =>
- LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+ LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = UnifiedLog.UnknownOffset,
leaderLogStartOffset = UnifiedLog.UnknownOffset,
@@ -1197,7 +1197,7 @@ class ReplicaManager(val config: KafkaConfig,
error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
s"on partition $tp: $fetchInfo", e)
- LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+ LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = UnifiedLog.UnknownOffset,
leaderLogStartOffset = UnifiedLog.UnknownOffset,
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index dce5a2eaee7..9d9ad717df3 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.mockito.ArgumentMatchers.{any, anyInt}
@@ -46,7 +47,7 @@ class DelayedFetchTest {
val replicaId = 1
val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
@@ -92,7 +93,7 @@ class DelayedFetchTest {
val replicaId = 1
val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
@@ -129,7 +130,7 @@ class DelayedFetchTest {
val replicaId = 1
val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
@@ -148,7 +149,7 @@ class DelayedFetchTest {
val partition: Partition = mock(classOf[Partition])
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
- val endOffsetMetadata = LogOffsetMetadata(messageOffset = 500L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+ val endOffsetMetadata = new LogOffsetMetadata(500L, 0L, 500)
when(partition.fetchOffsetSnapshot(
currentLeaderEpoch,
fetchOnlyFromLeader = true))
@@ -199,7 +200,7 @@ class DelayedFetchTest {
private def buildReadResult(error: Errors): LogReadResult = {
LogReadResult(
exception = if (error != Errors.NONE) Some(error.exception) else None,
- info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+ info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = -1L,
leaderLogStartOffset = -1L,
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 76910642ae9..7a24c77c59f 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -17,9 +17,9 @@
package kafka.cluster
import kafka.log.UnifiedLog
-import kafka.server.LogOffsetMetadata
import kafka.utils.MockTime
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -83,7 +83,7 @@ class ReplicaTest {
): Long = {
val currentTimeMs = time.milliseconds()
replica.updateFetchState(
- followerFetchOffsetMetadata = LogOffsetMetadata(followerFetchOffset),
+ followerFetchOffsetMetadata = new LogOffsetMetadata(followerFetchOffset),
followerStartOffset = followerStartOffset,
followerFetchTimeMs = currentTimeMs,
leaderEndOffset = leaderEndOffset
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 94be08c671f..301d2fa25cd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -27,7 +27,7 @@ import javax.management.ObjectName
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.{LogAppendInfo, UnifiedLog}
-import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal}
+import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
@@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.log.internals.AppendOrigin
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -890,12 +890,12 @@ class GroupMetadataManagerTest {
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true)))
- .thenReturn(FetchDataInfo(LogOffsetMetadata(segment1End), fileRecordsMock))
+ .thenReturn(FetchDataInfo(new LogOffsetMetadata(segment1End), fileRecordsMock))
when(logMock.read(ArgumentMatchers.eq(segment2End),
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true)))
- .thenReturn(FetchDataInfo(LogOffsetMetadata(segment2End), fileRecordsMock))
+ .thenReturn(FetchDataInfo(new LogOffsetMetadata(segment2End), fileRecordsMock))
when(fileRecordsMock.sizeInBytes())
.thenReturn(segment1Records.sizeInBytes)
.thenReturn(segment2Records.sizeInBytes)
@@ -2376,7 +2376,7 @@ class GroupMetadataManagerTest {
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true)))
- .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+ .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), mockRecords))
when(replicaManager.getLog(groupMetadataTopicPartition)).thenReturn(Some(logMock))
when(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).thenReturn(Some[Long](18))
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
@@ -2533,7 +2533,7 @@ class GroupMetadataManagerTest {
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true)))
- .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+ .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 9667d2fffde..ffd9f506fba 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -24,7 +24,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
import kafka.log.{LogConfig, UnifiedLog}
-import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache, RequestLocal}
+import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, MetadataCache, RequestLocal}
import kafka.utils.{Pool, TestUtils}
import org.apache.kafka.clients.{ClientResponse, NetworkClient}
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecor
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -469,7 +470,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
maxLength = anyInt,
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true)))
- .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+ .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
val bufferCaptor: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 59a4ef9241b..397c6c2c75b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kafka.log.{Defaults, LogConfig, UnifiedLog}
-import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager, RequestLocal}
+import kafka.server.{FetchDataInfo, FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.utils.{MockScheduler, Pool, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.server.log.internals.AppendOrigin
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -156,7 +156,7 @@ class TransactionStateManagerTest {
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true))
- ).thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+ ).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset))
txnMetadata1.state = PrepareCommit
@@ -822,7 +822,7 @@ class TransactionStateManagerTest {
transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 1, (_, _, _, _) => ())
assertEquals(0, transactionManager.loadingPartitions.size)
assertTrue(transactionManager.transactionMetadataCache.contains(partitionId))
- assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch)
+ assertEquals(1, transactionManager.transactionMetadataCache(partitionId).coordinatorEpoch)
}
@Test
@@ -840,7 +840,7 @@ class TransactionStateManagerTest {
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true))
- ).thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), MemoryRecords.EMPTY))
+ ).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY))
when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset))
transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _) => ())
@@ -1017,7 +1017,7 @@ class TransactionStateManagerTest {
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd),
minOneMessage = ArgumentMatchers.eq(true)))
- .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+ .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index deee5685bc5..6fe947ef578 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -22,13 +22,13 @@ import java.nio.channels.ClosedChannelException
import java.nio.charset.StandardCharsets
import java.util.regex.Pattern
import java.util.Collections
-import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata}
+import kafka.server.{FetchDataInfo, KafkaConfig}
import kafka.utils.{MockTime, Scheduler, TestUtils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.server.log.internals.LogDirFailureChannel
+import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions.{assertFalse, _}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 359c46de0b7..3e5ae15d211 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -23,14 +23,13 @@ import java.nio.channels.FileChannel
import java.nio.file.{Files, StandardOpenOption}
import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
-import kafka.server.LogOffsetMetadata
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Utils}
-import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn}
+import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{mock, when}
@@ -131,7 +130,7 @@ class ProducerStateManagerTest {
val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION)
// Sequence number wrap around
appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
- LogOffsetMetadata(2000L), 2020L, isTransactional = false)
+ new LogOffsetMetadata(2000L), 2020L, isTransactional = false)
assertEquals(None, stateManager.lastEntry(producerId))
stateManager.update(appendInfo)
assertTrue(stateManager.lastEntry(producerId).isDefined)
@@ -199,8 +198,7 @@ class ProducerStateManagerTest {
val seq = 0
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT)
- val firstOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
- relativePositionInSegment = 234224)
+ val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224)
producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
firstOffsetMetadata, offset, isTransactional = true)
stateManager.update(producerAppendInfo)
@@ -230,7 +228,7 @@ class ProducerStateManagerTest {
): Unit = {
val count = (endOffset - startOffset).toInt
appendInfo.appendDataBatch(producerEpoch, seq.get(), seq.addAndGet(count), time.milliseconds(),
- LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
+ new LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
seq.incrementAndGet()
}
@@ -240,7 +238,7 @@ class ProducerStateManagerTest {
assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.head)
stateManager.update(firstAppend)
stateManager.onHighWatermarkUpdated(21L)
- assertEquals(Some(LogOffsetMetadata(16L)), stateManager.firstUnstableOffset)
+ assertEquals(Some(new LogOffsetMetadata(16L)), stateManager.firstUnstableOffset)
// Now do a single append which completes the old transaction, mixes in
// some empty transactions, one non-empty complete transaction, and one
@@ -257,13 +255,13 @@ class ProducerStateManagerTest {
appendData(30L, 31L, secondAppend)
assertEquals(2, secondAppend.startedTransactions.size)
- assertEquals(TxnMetadata(producerId, LogOffsetMetadata(24L)), secondAppend.startedTransactions.head)
- assertEquals(TxnMetadata(producerId, LogOffsetMetadata(30L)), secondAppend.startedTransactions.last)
+ assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head)
+ assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last)
stateManager.update(secondAppend)
stateManager.completeTxn(firstCompletedTxn.get)
stateManager.completeTxn(secondCompletedTxn.get)
stateManager.onHighWatermarkUpdated(32L)
- assertEquals(Some(LogOffsetMetadata(30L)), stateManager.firstUnstableOffset)
+ assertEquals(Some(new LogOffsetMetadata(30L)), stateManager.firstUnstableOffset)
}
@Test
@@ -373,8 +371,7 @@ class ProducerStateManagerTest {
ProducerStateEntry.empty(producerId),
AppendOrigin.CLIENT
)
- val firstOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
- relativePositionInSegment = 50 * relativeOffset)
+ val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset)
producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
firstOffsetMetadata, startOffset, isTransactional = true)
stateManager.update(producerAppendInfo)
@@ -420,14 +417,14 @@ class ProducerStateManagerTest {
val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(),
- LogOffsetMetadata(15L), 20L, isTransactional = false)
+ new LogOffsetMetadata(15L), 20L, isTransactional = false)
assertEquals(None, stateManager.lastEntry(producerId))
stateManager.update(appendInfo)
assertTrue(stateManager.lastEntry(producerId).isDefined)
val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
- LogOffsetMetadata(26L), 30L, isTransactional = false)
+ new LogOffsetMetadata(26L), 30L, isTransactional = false)
assertTrue(stateManager.lastEntry(producerId).isDefined)
var lastEntry = stateManager.lastEntry(producerId).get
@@ -451,7 +448,7 @@ class ProducerStateManagerTest {
val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(),
- LogOffsetMetadata(16L), 20L, isTransactional = true)
+ new LogOffsetMetadata(16L), 20L, isTransactional = true)
var lastEntry = appendInfo.toEntry
assertEquals(producerEpoch, lastEntry.producerEpoch)
assertEquals(1, lastEntry.firstSeq)
@@ -462,7 +459,7 @@ class ProducerStateManagerTest {
assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
- LogOffsetMetadata(26L), 30L, isTransactional = true)
+ new LogOffsetMetadata(26L), 30L, isTransactional = true)
lastEntry = appendInfo.toEntry
assertEquals(producerEpoch, lastEntry.producerEpoch)
assertEquals(1, lastEntry.firstSeq)
@@ -1121,7 +1118,7 @@ class ProducerStateManagerTest {
origin : AppendOrigin = AppendOrigin.CLIENT): Unit = {
val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, timestamp,
- LogOffsetMetadata(offset), offset, isTransactional)
+ new LogOffsetMetadata(offset), offset, isTransactional)
stateManager.update(producerAppendInfo)
stateManager.updateMapEndOffset(offset + 1)
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 4a280795017..5db1ae2633a 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -26,7 +26,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException
import kafka.log.remote.RemoteLogManager
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogOffsetMetadata, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
import org.apache.kafka.common.errors._
@@ -37,7 +37,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
-import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, RecordValidationException}
+import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LogOffsetMetadata, RecordValidationException}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
@@ -154,17 +154,17 @@ class UnifiedLogTest {
val records = TestUtils.records(simpleRecords)
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
- assertEquals(LogOffsetMetadata(0, 0, 0), firstAppendInfo.firstOffset.get)
+ assertEquals(new LogOffsetMetadata(0, 0, 0), firstAppendInfo.firstOffset.get)
val secondAppendInfo = log.appendAsLeader(
TestUtils.records(simpleRecords),
leaderEpoch = 0
)
- assertEquals(LogOffsetMetadata(simpleRecords.size, 0, records.sizeInBytes), secondAppendInfo.firstOffset.get)
+ assertEquals(new LogOffsetMetadata(simpleRecords.size, 0, records.sizeInBytes), secondAppendInfo.firstOffset.get)
log.roll()
val afterRollAppendInfo = log.appendAsLeader(TestUtils.records(simpleRecords), leaderEpoch = 0)
- assertEquals(LogOffsetMetadata(simpleRecords.size * 2, simpleRecords.size * 2, 0), afterRollAppendInfo.firstOffset.get)
+ assertEquals(new LogOffsetMetadata(simpleRecords.size * 2, simpleRecords.size * 2, 0), afterRollAppendInfo.firstOffset.get)
}
@Test
@@ -213,7 +213,7 @@ class UnifiedLogTest {
log.close()
val reopened = createLog(logDir, logConfig)
- assertEquals(Some(LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)
+ assertEquals(Some(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)
truncateFunc(reopened)(0L)
assertEquals(None, reopened.firstUnstableOffset)
@@ -245,7 +245,7 @@ class UnifiedLogTest {
assertHighWatermark(0L)
// Update high watermark as leader
- log.maybeIncrementHighWatermark(LogOffsetMetadata(1L))
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L))
assertHighWatermark(1L)
// Cannot update past the log end offset
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index ae36d0a538d..01f707b2ea0 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Assumptions.assumeTrue
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -1422,7 +1423,7 @@ class AbstractFetcherThreadTest {
state.logStartOffset = partitionData.logStartOffset
state.highWatermark = partitionData.highWatermark
- Some(LogAppendInfo(firstOffset = Some(LogOffsetMetadata(fetchOffset)),
+ Some(LogAppendInfo(firstOffset = Some(new LogOffsetMetadata(fetchOffset)),
lastOffset = lastOffset,
lastLeaderEpoch = lastEpoch,
maxTimestamp = maxTimestamp,
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index e4d929553e4..cf072486e54 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.log.internals.LogDirFailureChannel
+import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{atLeastOnce, mock, verify, when}
@@ -98,7 +98,7 @@ class IsrExpirationTest {
// let the follower catch up to the Leader logEndOffset - 1
for (replica <- partition0.remoteReplicas)
replica.updateFetchState(
- followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
+ followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
@@ -147,7 +147,7 @@ class IsrExpirationTest {
// Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
for (replica <- partition0.remoteReplicas)
replica.updateFetchState(
- followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 2),
+ followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 2),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
@@ -161,7 +161,7 @@ class IsrExpirationTest {
partition0.remoteReplicas.foreach { r =>
r.updateFetchState(
- followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
+ followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
@@ -178,7 +178,7 @@ class IsrExpirationTest {
// Now actually make a fetch to the end of the log. The replicas should be back in ISR
partition0.remoteReplicas.foreach { r =>
r.updateFetchState(
- followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
+ followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
@@ -202,7 +202,7 @@ class IsrExpirationTest {
// let the follower catch up to the Leader logEndOffset
for (replica <- partition0.remoteReplicas)
replica.updateFetchState(
- followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
+ followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
@@ -238,7 +238,7 @@ class IsrExpirationTest {
// set lastCaughtUpTime to current time
for (replica <- partition.remoteReplicas)
replica.updateFetchState(
- followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+ followerFetchOffsetMetadata = new LogOffsetMetadata(0L),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = 0L)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a4d907c4acc..103799d9b18 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.log.internals.LogDirFailureChannel
+import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong}
@@ -147,7 +147,7 @@ class ReplicaManagerQuotasTest {
def testCompleteInDelayedFetchWithReplicaThrottling(): Unit = {
// Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
- val endOffsetMetadata = LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+ val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 500)
val partition: Partition = mock(classOf[Partition])
val offsetSnapshot = LogOffsetSnapshot(
@@ -167,8 +167,9 @@ class ReplicaManagerQuotasTest {
when(partition.getReplica(1)).thenReturn(None)
val tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0))
- val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
- relativePositionInSegment = 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
+ val fetchPartitionStatus = FetchPartitionStatus(
+ new LogOffsetMetadata(50L, 0L, 250),
+ new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion,
replicaId = 1,
@@ -199,9 +200,9 @@ class ReplicaManagerQuotasTest {
// Set up DelayedFetch where there is data to return to a consumer, either for the current segment or an older segment
def setupDelayedFetch(isFetchFromOlderSegment: Boolean): DelayedFetch = {
val endOffsetMetadata = if (isFetchFromOlderSegment)
- LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+ new LogOffsetMetadata(100L, 0L, 500)
else
- LogOffsetMetadata(messageOffset = 150L, segmentBaseOffset = 50L, relativePositionInSegment = 500)
+ new LogOffsetMetadata(150L, 50L, 500)
val partition: Partition = mock(classOf[Partition])
val offsetSnapshot = LogOffsetSnapshot(
@@ -217,8 +218,9 @@ class ReplicaManagerQuotasTest {
.thenReturn(partition)
val tidp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0))
- val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
- relativePositionInSegment = 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
+ val fetchPartitionStatus = FetchPartitionStatus(
+ new LogOffsetMetadata(50L, 0L, 250),
+ new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion,
replicaId = FetchRequest.CONSUMER_REPLICA_ID,
@@ -254,7 +256,7 @@ class ReplicaManagerQuotasTest {
when(log.logEndOffset).thenReturn(20L)
when(log.highWatermark).thenReturn(5)
when(log.lastStableOffset).thenReturn(5)
- when(log.logEndOffsetMetadata).thenReturn(LogOffsetMetadata(20L))
+ when(log.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(20L))
when(log.topicId).thenReturn(Some(topicId))
//if we ask for len 1 return a message
@@ -263,7 +265,7 @@ class ReplicaManagerQuotasTest {
isolation = any[FetchIsolation],
minOneMessage = anyBoolean)).thenReturn(
FetchDataInfo(
- LogOffsetMetadata(0L, 0L, 0),
+ new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.withRecords(CompressionType.NONE, record)
))
@@ -273,7 +275,7 @@ class ReplicaManagerQuotasTest {
isolation = any[FetchIsolation],
minOneMessage = anyBoolean)).thenReturn(
FetchDataInfo(
- LogOffsetMetadata(0L, 0L, 0),
+ new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.EMPTY
))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 8c63350e396..a5e64a60cfe 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -58,7 +58,7 @@ import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, C
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
-import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -2007,7 +2007,7 @@ class ReplicaManagerTest {
override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
override def logEndOffsetMetadata: LogOffsetMetadata =
- localLogOffset.map(LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
+ localLogOffset.map(new LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
override def logEndOffset: Long = localLogOffset.getOrElse(super.logEndOffset)
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 75da931a658..f86b942c3c8 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -27,7 +27,6 @@ import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.BrokerTopicStats;
-import kafka.server.LogOffsetMetadata;
import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints;
@@ -39,6 +38,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrParti
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
+import org.apache.kafka.server.log.internals.LogOffsetMetadata;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java
new file mode 100644
index 00000000000..dd6124025d8
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+/*
+ * A log offset structure, including:
+ * 1. the message offset
+ * 2. the base message offset of the located segment
+ * 3. the physical position on the located segment
+ */
+public final class LogOffsetMetadata {
+
+ //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module
+ private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+ public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0);
+
+ private static final int UNKNOWN_FILE_POSITION = -1;
+
+ public final long messageOffset;
+ public final long segmentBaseOffset;
+ public final int relativePositionInSegment;
+
+ public LogOffsetMetadata(long messageOffset) {
+ this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
+ }
+
+ public LogOffsetMetadata(long messageOffset,
+ long segmentBaseOffset,
+ int relativePositionInSegment) {
+ this.messageOffset = messageOffset;
+ this.segmentBaseOffset = segmentBaseOffset;
+ this.relativePositionInSegment = relativePositionInSegment;
+ }
+
+ // check if this offset is already on an older segment compared with the given offset
+ public boolean onOlderSegment(LogOffsetMetadata that) {
+ if (messageOffsetOnly())
+ throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info");
+
+ return this.segmentBaseOffset < that.segmentBaseOffset;
+ }
+
+ // check if this offset is on the same segment with the given offset
+ private boolean onSameSegment(LogOffsetMetadata that) {
+ return this.segmentBaseOffset == that.segmentBaseOffset;
+ }
+
+ // compute the number of bytes between this offset to the given offset
+ // if they are on the same segment and this offset precedes the given offset
+ public int positionDiff(LogOffsetMetadata that) {
+ if (messageOffsetOnly())
+ throw new KafkaException(this + " cannot compare its segment position with " + that + " since it only has message offset info");
+ if (!onSameSegment(that))
+ throw new KafkaException(this + " cannot compare its segment position with " + that + " since they are not on the same segment");
+
+ return this.relativePositionInSegment - that.relativePositionInSegment;
+ }
+
+ // decide if the offset metadata only contains message offset info
+ public boolean messageOffsetOnly() {
+ return segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET && relativePositionInSegment == UNKNOWN_FILE_POSITION;
+ }
+
+ @Override
+ public String toString() {
+ return "(offset=" + messageOffset + "segment=[" + segmentBaseOffset + ":" + relativePositionInSegment + "])";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ LogOffsetMetadata that = (LogOffsetMetadata) o;
+ return messageOffset == that.messageOffset
+ && segmentBaseOffset == that.segmentBaseOffset
+ && relativePositionInSegment == that.relativePositionInSegment;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Long.hashCode(messageOffset);
+ result = 31 * result + Long.hashCode(segmentBaseOffset);
+ result = 31 * result + Integer.hashCode(relativePositionInSegment);
+ return result;
+ }
+}