You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/12/28 01:12:15 UTC

[kafka] branch trunk updated: KAFKA-14543: Move LogOffsetMetadata to storage module (#13038)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8184ada6a5c KAFKA-14543: Move LogOffsetMetadata to storage module (#13038)
8184ada6a5c is described below

commit 8184ada6a5c98cd637e8d323c56dbe6b20d582bd
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Dec 28 02:12:02 2022 +0100

    KAFKA-14543: Move LogOffsetMetadata to storage module (#13038)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, dengziming <de...@gmail.com>, Satish Duggana <sa...@apache.org>, Federico Valeri <fe...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 core/src/main/scala/kafka/cluster/Replica.scala    |   6 +-
 core/src/main/scala/kafka/log/LocalLog.scala       |   6 +-
 core/src/main/scala/kafka/log/LogLoader.scala      |   5 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |   6 +-
 .../scala/kafka/log/ProducerStateManager.scala     |   8 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  34 +++----
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |   2 +-
 .../src/main/scala/kafka/server/DelayedFetch.scala |   3 +-
 .../main/scala/kafka/server/FetchDataInfo.scala    |   3 +-
 .../scala/kafka/server/LogOffsetMetadata.scala     |  84 -----------------
 .../main/scala/kafka/server/ReplicaManager.scala   |  10 +-
 .../kafka/server/DelayedFetchTest.scala            |  11 ++-
 .../scala/unit/kafka/cluster/ReplicaTest.scala     |   4 +-
 .../group/GroupMetadataManagerTest.scala           |  12 +--
 .../TransactionCoordinatorConcurrencyTest.scala    |   5 +-
 .../transaction/TransactionStateManagerTest.scala  |  12 +--
 .../test/scala/unit/kafka/log/LocalLogTest.scala   |   4 +-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  31 +++----
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |  14 +--
 .../kafka/server/AbstractFetcherThreadTest.scala   |   3 +-
 .../unit/kafka/server/IsrExpirationTest.scala      |  14 +--
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  24 ++---
 .../unit/kafka/server/ReplicaManagerTest.scala     |   4 +-
 .../UpdateFollowerFetchStateBenchmark.java         |   2 +-
 .../server/log/internals/LogOffsetMetadata.java    | 103 +++++++++++++++++++++
 26 files changed, 217 insertions(+), 195 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c4df79219a4..fe8792189ba 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.log.internals.AppendOrigin
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 0321488af4d..7a60632a54b 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,9 +18,9 @@
 package kafka.cluster
 
 import kafka.log.UnifiedLog
-import kafka.server.LogOffsetMetadata
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
 
 import java.util.concurrent.atomic.AtomicReference
 
@@ -69,7 +69,7 @@ case class ReplicaState(
 
 object ReplicaState {
   val Empty: ReplicaState = ReplicaState(
-    logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
+    logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
     logStartOffset = UnifiedLog.UnknownOffset,
     lastFetchLeaderLogEndOffset = 0L,
     lastFetchTimeMs = 0L,
@@ -139,7 +139,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
       if (isNewLeader) {
         ReplicaState(
           logStartOffset = UnifiedLog.UnknownOffset,
-          logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
+          logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
           lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
           lastFetchTimeMs = 0L,
           lastCaughtUpTimeMs = lastCaughtUpTimeMs
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index 10d5217069b..b62fead3b91 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -23,14 +23,14 @@ import java.text.NumberFormat
 import java.util.concurrent.atomic.AtomicLong
 import java.util.regex.Pattern
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{FetchDataInfo, LogOffsetMetadata}
+import kafka.server.FetchDataInfo
 import kafka.utils.{Logging, Scheduler}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetPosition}
+import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.{Seq, immutable}
@@ -200,7 +200,7 @@ class LocalLog(@volatile private var _dir: File,
    * @param endOffset the new end offset of the log
    */
   private[log] def updateLogEndOffset(endOffset: Long): Unit = {
-    nextOffsetMetadata = LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
+    nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
     if (recoveryPoint > endOffset) {
       updateRecoveryPoint(endOffset)
     }
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index e69283f71a6..59ea5a8c1f6 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -21,14 +21,13 @@ import java.io.{File, IOException}
 import java.nio.file.{Files, NoSuchFileException}
 import kafka.common.LogSegmentOffsetOverflowException
 import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
-import kafka.server.LogOffsetMetadata
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.{CoreUtils, Logging, Scheduler}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidOffsetException
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.snapshot.Snapshots
-import org.apache.kafka.server.log.internals.{CorruptIndexException, LogDirFailureChannel}
+import org.apache.kafka.server.log.internals.{CorruptIndexException, LogDirFailureChannel, LogOffsetMetadata}
 
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import scala.collection.{Set, mutable}
@@ -209,7 +208,7 @@ class LogLoader(
     LoadedLogOffsets(
       newLogStartOffset,
       newRecoveryPoint,
-      LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size))
+      new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size))
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 93677bee653..16408f21394 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -25,14 +25,14 @@ import java.util.concurrent.TimeUnit
 import kafka.common.LogSegmentOffsetOverflowException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{FetchDataInfo, LogOffsetMetadata}
+import kafka.server.FetchDataInfo
 import kafka.utils._
 import org.apache.kafka.common.InvalidRecordException
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
-import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
+import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
 
 import java.util.Optional
 import scala.jdk.CollectionConverters._
@@ -305,7 +305,7 @@ class LogSegment private[log] (val log: FileRecords,
       return null
 
     val startPosition = startOffsetAndSize.position
-    val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
+    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
 
     val adjustedMaxSize =
       if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 1b9c4593d02..c6fe0486448 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -22,7 +22,7 @@ import java.nio.channels.FileChannel
 import java.nio.file.{Files, NoSuchFileException, StandardOpenOption}
 import java.util.concurrent.ConcurrentSkipListMap
 import kafka.log.UnifiedLog.offsetFromFile
-import kafka.server.{BrokerReconfigurable, KafkaConfig, LogOffsetMetadata}
+import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import kafka.utils.{CoreUtils, Logging, nonthreadsafe, threadsafe}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -30,7 +30,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils}
-import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn}
+import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
@@ -50,7 +50,7 @@ private[log] case class TxnMetadata(
   firstOffset: LogOffsetMetadata,
   var lastOffset: Option[Long] = None
 ) {
-  def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
+  def this(producerId: Long, firstOffset: Long) = this(producerId, new LogOffsetMetadata(firstOffset))
 
   override def toString: String = {
     "TxnMetadata(" +
@@ -259,7 +259,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
         None
       }
     } else {
-      val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(LogOffsetMetadata(batch.baseOffset))
+      val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(new LogOffsetMetadata(batch.baseOffset))
       appendDataBatch(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
         firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
       None
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index c876840182b..0bab5fa7018 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -28,7 +28,7 @@ import kafka.log.remote.RemoteLogManager
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
+import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
@@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
-import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogDirFailureChannel, LogValidator}
+import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogDirFailureChannel, LogOffsetMetadata, LogValidator}
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.record.BrokerCompressionType
 
@@ -273,7 +273,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * equals the log end offset (which may never happen for a partition under consistent load). This is needed to
    * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
    */
-  @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
+  @volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(logStartOffset)
 
   @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
 
@@ -324,7 +324,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         try partMetadataFile.delete()
         catch {
           case e: IOException =>
-            error(s"Error while trying to delete partition metadata file ${partMetadataFile}", e)
+            error(s"Error while trying to delete partition metadata file $partMetadataFile", e)
         }
       }
     } else if (keepPartitionMetadataFile) {
@@ -380,7 +380,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * @return the updated high watermark offset
    */
   def updateHighWatermark(hw: Long): Long = {
-    updateHighWatermark(LogOffsetMetadata(hw))
+    updateHighWatermark(new LogOffsetMetadata(hw))
   }
 
   /**
@@ -393,7 +393,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
     val endOffsetMetadata = localLog.logEndOffsetMetadata
     val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) {
-      LogOffsetMetadata(logStartOffset)
+      new LogOffsetMetadata(logStartOffset)
     } else if (highWatermarkMetadata.messageOffset >= endOffsetMetadata.messageOffset) {
       endOffsetMetadata
     } else {
@@ -445,7 +445,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   def maybeUpdateHighWatermark(hw: Long): Option[Long] = {
     lock.synchronized {
       val oldHighWatermark = highWatermarkMetadata
-      updateHighWatermark(LogOffsetMetadata(hw)) match {
+      updateHighWatermark(new LogOffsetMetadata(hw)) match {
         case oldHighWatermark.messageOffset =>
           None
         case newHighWatermark =>
@@ -838,7 +838,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (validateAndAssignOffsets) {
             // assign offsets to the message set
             val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
-            appendInfo.firstOffset = Some(LogOffsetMetadata(offset.value))
+            appendInfo.firstOffset = Some(new LogOffsetMetadata(offset.value))
             val validateAndOffsetAssignResult = try {
               val validator = new LogValidator(validRecords,
                 topicPartition,
@@ -935,10 +935,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           // maybe roll the log if this segment is full
           val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
 
-          val logOffsetMetadata = LogOffsetMetadata(
-            messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
-            segmentBaseOffset = segment.baseOffset,
-            relativePositionInSegment = segment.size)
+          val logOffsetMetadata = new LogOffsetMetadata(
+            appendInfo.firstOrLastOffsetOfFirstBatch,
+            segment.baseOffset,
+            segment.size)
 
           // now that we have valid records, offsets assigned, and timestamps updated, we need to
           // validate the idempotent/transactional state of the producers and collect some metadata
@@ -947,14 +947,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
           maybeDuplicate match {
             case Some(duplicate) =>
-              appendInfo.firstOffset = Some(LogOffsetMetadata(duplicate.firstOffset))
+              appendInfo.firstOffset = Some(new LogOffsetMetadata(duplicate.firstOffset))
               appendInfo.lastOffset = duplicate.lastOffset
               appendInfo.logAppendTime = duplicate.timestamp
               appendInfo.logStartOffset = logStartOffset
             case None =>
               // Before appending update the first offset metadata to include segment information
               appendInfo.firstOffset = appendInfo.firstOffset.map { offsetMetadata =>
-                offsetMetadata.copy(segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size)
+                new LogOffsetMetadata(offsetMetadata.messageOffset, segment.baseOffset, segment.size)
               }
 
               // Append the records, and increment the local log end offset immediately after the append because a
@@ -1091,7 +1091,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         // We cache offset metadata for the start of each transaction. This allows us to
         // compute the last stable offset without relying on additional index lookups.
         val firstOffsetMetadata = if (batch.isTransactional)
-          Some(LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
+          Some(new LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
         else
           None
 
@@ -1155,7 +1155,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       // Also indicate whether we have the accurate first offset or not
       if (!readFirstMessage) {
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-          firstOffset = Some(LogOffsetMetadata(batch.baseOffset))
+          firstOffset = Some(new LogOffsetMetadata(batch.baseOffset))
         lastOffsetOfFirstBatch = batch.lastOffset
         readFirstMessage = true
       }
@@ -1328,7 +1328,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
         val remoteOffset = if (remoteLogEnabled()) {
           if (remoteLogManager.isEmpty) {
-            throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.");
+            throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.")
           }
           if (recordVersion.value < RecordVersion.V2.value) {
             throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.")
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 7056f5db0e6..a892d9c235b 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -190,7 +190,7 @@ final class KafkaMetadataLog private (
   override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {
     offsetMetadata.metadata.asScala match {
       case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark(
-        new kafka.server.LogOffsetMetadata(
+        new org.apache.kafka.server.log.internals.LogOffsetMetadata(
           offsetMetadata.offset,
           segmentPosition.baseOffset,
           segmentPosition.relativePosition)
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 55a15682b64..b61bd8bc8a0 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
 
 import scala.collection._
 
@@ -75,7 +76,7 @@ class DelayedFetch(
         val fetchOffset = fetchStatus.startOffsetMetadata
         val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
         try {
-          if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
+          if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
             val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
             val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
 
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
index 95b68c08395..f3b0c40ea3b 100644
--- a/core/src/main/scala/kafka/server/FetchDataInfo.scala
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
 
 sealed trait FetchIsolation
 case object FetchLogEnd extends FetchIsolation
@@ -78,7 +79,7 @@ case class FetchParams(
 object FetchDataInfo {
   def empty(fetchOffset: Long): FetchDataInfo = {
     FetchDataInfo(
-      fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
+      fetchOffsetMetadata = new LogOffsetMetadata(fetchOffset),
       records = MemoryRecords.EMPTY,
       firstEntryIncomplete = false,
       abortedTransactions = None
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
deleted file mode 100644
index 94002607af0..00000000000
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.log.UnifiedLog
-import org.apache.kafka.common.KafkaException
-
-object LogOffsetMetadata {
-  val UnknownOffsetMetadata = LogOffsetMetadata(-1, 0, 0)
-  val UnknownFilePosition = -1
-
-  class OffsetOrdering extends Ordering[LogOffsetMetadata] {
-    override def compare(x: LogOffsetMetadata, y: LogOffsetMetadata): Int = {
-      x.offsetDiff(y).toInt
-    }
-  }
-
-}
-
-/*
- * A log offset structure, including:
- *  1. the message offset
- *  2. the base message offset of the located segment
- *  3. the physical position on the located segment
- */
-case class LogOffsetMetadata(messageOffset: Long,
-                             segmentBaseOffset: Long = UnifiedLog.UnknownOffset,
-                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
-
-  // check if this offset is already on an older segment compared with the given offset
-  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
-    if (messageOffsetOnly)
-      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
-
-    this.segmentBaseOffset < that.segmentBaseOffset
-  }
-
-  // check if this offset is on the same segment with the given offset
-  def onSameSegment(that: LogOffsetMetadata): Boolean = {
-    if (messageOffsetOnly)
-      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
-
-    this.segmentBaseOffset == that.segmentBaseOffset
-  }
-
-  // compute the number of messages between this offset to the given offset
-  def offsetDiff(that: LogOffsetMetadata): Long = {
-    this.messageOffset - that.messageOffset
-  }
-
-  // compute the number of bytes between this offset to the given offset
-  // if they are on the same segment and this offset precedes the given offset
-  def positionDiff(that: LogOffsetMetadata): Int = {
-    if(!onSameSegment(that))
-      throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
-    if(messageOffsetOnly)
-      throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")
-
-    this.relativePositionInSegment - that.relativePositionInSegment
-  }
-
-  // decide if the offset metadata only contains message offset info
-  def messageOffsetOnly: Boolean = {
-    segmentBaseOffset == UnifiedLog.UnknownOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
-  }
-
-  override def toString = s"(offset=$messageOffset segment=[$segmentBaseOffset:$relativePositionInSegment])"
-
-}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c88cc688693..f6493ad7066 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -61,7 +61,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, RecordValidationException}
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException}
 
 import java.nio.file.{Files, Paths}
 import java.util
@@ -128,7 +128,7 @@ case class LogReadResult(info: FetchDataInfo,
     isReassignmentFetch)
 
   def withEmptyFetchInfo: LogReadResult =
-    copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
+    copy(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY))
 
   override def toString = {
     "LogReadResult(" +
@@ -1125,7 +1125,7 @@ class ReplicaManager(val config: KafkaConfig,
           }
           // If a preferred read-replica is set, skip the read
           val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
-          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
             divergingEpoch = None,
             highWatermark = offsetSnapshot.highWatermark.messageOffset,
             leaderLogStartOffset = offsetSnapshot.logStartOffset,
@@ -1180,7 +1180,7 @@ class ReplicaManager(val config: KafkaConfig,
                  _: KafkaStorageException |
                  _: OffsetOutOfRangeException |
                  _: InconsistentTopicIdException) =>
-          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
             divergingEpoch = None,
             highWatermark = UnifiedLog.UnknownOffset,
             leaderLogStartOffset = UnifiedLog.UnknownOffset,
@@ -1197,7 +1197,7 @@ class ReplicaManager(val config: KafkaConfig,
           error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
             s"on partition $tp: $fetchInfo", e)
 
-          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
             divergingEpoch = None,
             highWatermark = UnifiedLog.UnknownOffset,
             leaderLogStartOffset = UnifiedLog.UnknownOffset,
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index dce5a2eaee7..9d9ad717df3 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
 import org.mockito.ArgumentMatchers.{any, anyInt}
@@ -46,7 +47,7 @@ class DelayedFetchTest {
     val replicaId = 1
 
     val fetchStatus = FetchPartitionStatus(
-      startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
       fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
     val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
 
@@ -92,7 +93,7 @@ class DelayedFetchTest {
     val replicaId = 1
 
     val fetchStatus = FetchPartitionStatus(
-      startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
       fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
     val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
 
@@ -129,7 +130,7 @@ class DelayedFetchTest {
     val replicaId = 1
 
     val fetchStatus = FetchPartitionStatus(
-      startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
       fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
     val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
 
@@ -148,7 +149,7 @@ class DelayedFetchTest {
 
     val partition: Partition = mock(classOf[Partition])
     when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
-    val endOffsetMetadata = LogOffsetMetadata(messageOffset = 500L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+    val endOffsetMetadata = new LogOffsetMetadata(500L, 0L, 500)
     when(partition.fetchOffsetSnapshot(
       currentLeaderEpoch,
       fetchOnlyFromLeader = true))
@@ -199,7 +200,7 @@ class DelayedFetchTest {
   private def buildReadResult(error: Errors): LogReadResult = {
     LogReadResult(
       exception = if (error != Errors.NONE) Some(error.exception) else None,
-      info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+      info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
       divergingEpoch = None,
       highWatermark = -1L,
       leaderLogStartOffset = -1L,
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 76910642ae9..7a24c77c59f 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -17,9 +17,9 @@
 package kafka.cluster
 
 import kafka.log.UnifiedLog
-import kafka.server.LogOffsetMetadata
 import kafka.utils.MockTime
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
 
@@ -83,7 +83,7 @@ class ReplicaTest {
   ): Long = {
     val currentTimeMs = time.milliseconds()
     replica.updateFetchState(
-      followerFetchOffsetMetadata = LogOffsetMetadata(followerFetchOffset),
+      followerFetchOffsetMetadata = new LogOffsetMetadata(followerFetchOffset),
       followerStartOffset = followerStartOffset,
       followerFetchTimeMs = currentTimeMs,
       leaderEndOffset = leaderEndOffset
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 94be08c671f..301d2fa25cd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -27,7 +27,7 @@ import javax.management.ObjectName
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.log.{LogAppendInfo, UnifiedLog}
-import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal}
+import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
@@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.log.internals.AppendOrigin
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -890,12 +890,12 @@ class GroupMetadataManagerTest {
       maxLength = anyInt(),
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true)))
-      .thenReturn(FetchDataInfo(LogOffsetMetadata(segment1End), fileRecordsMock))
+      .thenReturn(FetchDataInfo(new LogOffsetMetadata(segment1End), fileRecordsMock))
     when(logMock.read(ArgumentMatchers.eq(segment2End),
       maxLength = anyInt(),
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true)))
-      .thenReturn(FetchDataInfo(LogOffsetMetadata(segment2End), fileRecordsMock))
+      .thenReturn(FetchDataInfo(new LogOffsetMetadata(segment2End), fileRecordsMock))
     when(fileRecordsMock.sizeInBytes())
       .thenReturn(segment1Records.sizeInBytes)
       .thenReturn(segment2Records.sizeInBytes)
@@ -2376,7 +2376,7 @@ class GroupMetadataManagerTest {
       maxLength = anyInt(),
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true)))
-      .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+      .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), mockRecords))
     when(replicaManager.getLog(groupMetadataTopicPartition)).thenReturn(Some(logMock))
     when(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).thenReturn(Some[Long](18))
     groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
@@ -2533,7 +2533,7 @@ class GroupMetadataManagerTest {
       maxLength = anyInt(),
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true)))
-      .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+      .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
 
     when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 9667d2fffde..ffd9f506fba 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -24,7 +24,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
 import kafka.log.{LogConfig, UnifiedLog}
-import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache, RequestLocal}
+import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, MetadataCache, RequestLocal}
 import kafka.utils.{Pool, TestUtils}
 import org.apache.kafka.clients.{ClientResponse, NetworkClient}
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecor
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
 import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -469,7 +470,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
       maxLength = anyInt,
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true)))
-      .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+      .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
 
     when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
     val bufferCaptor: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 59a4ef9241b..397c6c2c75b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
 import java.util.concurrent.locks.ReentrantLock
 import javax.management.ObjectName
 import kafka.log.{Defaults, LogConfig, UnifiedLog}
-import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager, RequestLocal}
+import kafka.server.{FetchDataInfo, FetchLogEnd, ReplicaManager, RequestLocal}
 import kafka.utils.{MockScheduler, Pool, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.server.log.internals.AppendOrigin
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -156,7 +156,7 @@ class TransactionStateManagerTest {
       maxLength = anyInt(),
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true))
-    ).thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+    ).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
     when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset))
 
     txnMetadata1.state = PrepareCommit
@@ -822,7 +822,7 @@ class TransactionStateManagerTest {
     transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 1, (_, _, _, _) => ())
     assertEquals(0, transactionManager.loadingPartitions.size)
     assertTrue(transactionManager.transactionMetadataCache.contains(partitionId))
-    assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch)
+    assertEquals(1, transactionManager.transactionMetadataCache(partitionId).coordinatorEpoch)
   }
 
   @Test
@@ -840,7 +840,7 @@ class TransactionStateManagerTest {
       maxLength = anyInt(),
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true))
-    ).thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), MemoryRecords.EMPTY))
+    ).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY))
     when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset))
 
     transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _) => ())
@@ -1017,7 +1017,7 @@ class TransactionStateManagerTest {
       maxLength = anyInt(),
       isolation = ArgumentMatchers.eq(FetchLogEnd),
       minOneMessage = ArgumentMatchers.eq(true)))
-      .thenReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+      .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
 
     when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
 
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index deee5685bc5..6fe947ef578 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -22,13 +22,13 @@ import java.nio.channels.ClosedChannelException
 import java.nio.charset.StandardCharsets
 import java.util.regex.Pattern
 import java.util.Collections
-import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata}
+import kafka.server.{FetchDataInfo, KafkaConfig}
 import kafka.utils.{MockTime, Scheduler, TestUtils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.server.log.internals.LogDirFailureChannel
+import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata}
 import org.junit.jupiter.api.Assertions.{assertFalse, _}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 359c46de0b7..3e5ae15d211 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -23,14 +23,13 @@ import java.nio.channels.FileChannel
 import java.nio.file.{Files, StandardOpenOption}
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
-import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn}
+import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.Mockito.{mock, when}
@@ -131,7 +130,7 @@ class ProducerStateManagerTest {
     val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION)
     // Sequence number wrap around
     appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
-      LogOffsetMetadata(2000L), 2020L, isTransactional = false)
+      new LogOffsetMetadata(2000L), 2020L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
@@ -199,8 +198,7 @@ class ProducerStateManagerTest {
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT)
 
-    val firstOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
-      relativePositionInSegment = 234224)
+    val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224)
     producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
       firstOffsetMetadata, offset, isTransactional = true)
     stateManager.update(producerAppendInfo)
@@ -230,7 +228,7 @@ class ProducerStateManagerTest {
     ): Unit = {
       val count = (endOffset - startOffset).toInt
       appendInfo.appendDataBatch(producerEpoch, seq.get(), seq.addAndGet(count), time.milliseconds(),
-        LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
+        new LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
       seq.incrementAndGet()
     }
 
@@ -240,7 +238,7 @@ class ProducerStateManagerTest {
     assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.head)
     stateManager.update(firstAppend)
     stateManager.onHighWatermarkUpdated(21L)
-    assertEquals(Some(LogOffsetMetadata(16L)), stateManager.firstUnstableOffset)
+    assertEquals(Some(new LogOffsetMetadata(16L)), stateManager.firstUnstableOffset)
 
     // Now do a single append which completes the old transaction, mixes in
     // some empty transactions, one non-empty complete transaction, and one
@@ -257,13 +255,13 @@ class ProducerStateManagerTest {
     appendData(30L, 31L, secondAppend)
 
     assertEquals(2, secondAppend.startedTransactions.size)
-    assertEquals(TxnMetadata(producerId, LogOffsetMetadata(24L)), secondAppend.startedTransactions.head)
-    assertEquals(TxnMetadata(producerId, LogOffsetMetadata(30L)), secondAppend.startedTransactions.last)
+    assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head)
+    assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last)
     stateManager.update(secondAppend)
     stateManager.completeTxn(firstCompletedTxn.get)
     stateManager.completeTxn(secondCompletedTxn.get)
     stateManager.onHighWatermarkUpdated(32L)
-    assertEquals(Some(LogOffsetMetadata(30L)), stateManager.firstUnstableOffset)
+    assertEquals(Some(new LogOffsetMetadata(30L)), stateManager.firstUnstableOffset)
   }
 
   @Test
@@ -373,8 +371,7 @@ class ProducerStateManagerTest {
         ProducerStateEntry.empty(producerId),
         AppendOrigin.CLIENT
       )
-      val firstOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
-        relativePositionInSegment = 50 * relativeOffset)
+      val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset)
       producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
         firstOffsetMetadata, startOffset, isTransactional = true)
       stateManager.update(producerAppendInfo)
@@ -420,14 +417,14 @@ class ProducerStateManagerTest {
 
     val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
     appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(),
-      LogOffsetMetadata(15L), 20L, isTransactional = false)
+      new LogOffsetMetadata(15L), 20L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
     val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
     nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
-      LogOffsetMetadata(26L), 30L, isTransactional = false)
+      new LogOffsetMetadata(26L), 30L, isTransactional = false)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
     var lastEntry = stateManager.lastEntry(producerId).get
@@ -451,7 +448,7 @@ class ProducerStateManagerTest {
 
     val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
     appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(),
-      LogOffsetMetadata(16L), 20L, isTransactional = true)
+      new LogOffsetMetadata(16L), 20L, isTransactional = true)
     var lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
@@ -462,7 +459,7 @@ class ProducerStateManagerTest {
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
     appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
-      LogOffsetMetadata(26L), 30L, isTransactional = true)
+      new LogOffsetMetadata(26L), 30L, isTransactional = true)
     lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
@@ -1121,7 +1118,7 @@ class ProducerStateManagerTest {
                      origin : AppendOrigin = AppendOrigin.CLIENT): Unit = {
     val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
     producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, timestamp,
-      LogOffsetMetadata(offset), offset, isTransactional)
+      new LogOffsetMetadata(offset), offset, isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
   }
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 4a280795017..5db1ae2633a 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -26,7 +26,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException
 import kafka.log.remote.RemoteLogManager
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogOffsetMetadata, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, PartitionMetadataFile}
 import kafka.utils._
 import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
@@ -37,7 +37,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
 import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
-import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, RecordValidationException}
+import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LogOffsetMetadata, RecordValidationException}
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
@@ -154,17 +154,17 @@ class UnifiedLogTest {
     val records = TestUtils.records(simpleRecords)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(LogOffsetMetadata(0, 0, 0), firstAppendInfo.firstOffset.get)
+    assertEquals(new LogOffsetMetadata(0, 0, 0), firstAppendInfo.firstOffset.get)
 
     val secondAppendInfo = log.appendAsLeader(
       TestUtils.records(simpleRecords),
       leaderEpoch = 0
     )
-    assertEquals(LogOffsetMetadata(simpleRecords.size, 0, records.sizeInBytes), secondAppendInfo.firstOffset.get)
+    assertEquals(new LogOffsetMetadata(simpleRecords.size, 0, records.sizeInBytes), secondAppendInfo.firstOffset.get)
 
     log.roll()
     val afterRollAppendInfo =  log.appendAsLeader(TestUtils.records(simpleRecords), leaderEpoch = 0)
-    assertEquals(LogOffsetMetadata(simpleRecords.size * 2, simpleRecords.size * 2, 0), afterRollAppendInfo.firstOffset.get)
+    assertEquals(new LogOffsetMetadata(simpleRecords.size * 2, simpleRecords.size * 2, 0), afterRollAppendInfo.firstOffset.get)
   }
 
   @Test
@@ -213,7 +213,7 @@ class UnifiedLogTest {
     log.close()
 
     val reopened = createLog(logDir, logConfig)
-    assertEquals(Some(LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)
+    assertEquals(Some(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)
 
     truncateFunc(reopened)(0L)
     assertEquals(None, reopened.firstUnstableOffset)
@@ -245,7 +245,7 @@ class UnifiedLogTest {
     assertHighWatermark(0L)
 
     // Update high watermark as leader
-    log.maybeIncrementHighWatermark(LogOffsetMetadata(1L))
+    log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L))
     assertHighWatermark(1L)
 
     // Cannot update past the log end offset
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index ae36d0a538d..01f707b2ea0 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.server.log.internals.LogOffsetMetadata
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Assumptions.assumeTrue
 import org.junit.jupiter.api.{BeforeEach, Test}
@@ -1422,7 +1423,7 @@ class AbstractFetcherThreadTest {
       state.logStartOffset = partitionData.logStartOffset
       state.highWatermark = partitionData.highWatermark
 
-      Some(LogAppendInfo(firstOffset = Some(LogOffsetMetadata(fetchOffset)),
+      Some(LogAppendInfo(firstOffset = Some(new LogOffsetMetadata(fetchOffset)),
         lastOffset = lastOffset,
         lastLeaderEpoch = lastEpoch,
         maxTimestamp = maxTimestamp,
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index e4d929553e4..cf072486e54 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.log.internals.LogDirFailureChannel
+import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.Mockito.{atLeastOnce, mock, verify, when}
@@ -98,7 +98,7 @@ class IsrExpirationTest {
     // let the follower catch up to the Leader logEndOffset - 1
     for (replica <- partition0.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
+        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
@@ -147,7 +147,7 @@ class IsrExpirationTest {
     // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
     for (replica <- partition0.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 2),
+        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 2),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
@@ -161,7 +161,7 @@ class IsrExpirationTest {
 
     partition0.remoteReplicas.foreach { r =>
       r.updateFetchState(
-        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
+        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
@@ -178,7 +178,7 @@ class IsrExpirationTest {
     // Now actually make a fetch to the end of the log. The replicas should be back in ISR
     partition0.remoteReplicas.foreach { r =>
       r.updateFetchState(
-        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
+        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
@@ -202,7 +202,7 @@ class IsrExpirationTest {
     // let the follower catch up to the Leader logEndOffset
     for (replica <- partition0.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
+        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
@@ -238,7 +238,7 @@ class IsrExpirationTest {
     // set lastCaughtUpTime to current time
     for (replica <- partition.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+        followerFetchOffsetMetadata = new LogOffsetMetadata(0L),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = 0L)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a4d907c4acc..103799d9b18 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.log.internals.LogDirFailureChannel
+import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong}
@@ -147,7 +147,7 @@ class ReplicaManagerQuotasTest {
   def testCompleteInDelayedFetchWithReplicaThrottling(): Unit = {
     // Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
     def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
-      val endOffsetMetadata = LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+      val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 500)
       val partition: Partition = mock(classOf[Partition])
 
       val offsetSnapshot = LogOffsetSnapshot(
@@ -167,8 +167,9 @@ class ReplicaManagerQuotasTest {
       when(partition.getReplica(1)).thenReturn(None)
 
       val tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0))
-      val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
-         relativePositionInSegment = 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
+      val fetchPartitionStatus = FetchPartitionStatus(
+        new LogOffsetMetadata(50L, 0L, 250),
+        new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
       val fetchParams = FetchParams(
         requestVersion = ApiKeys.FETCH.latestVersion,
         replicaId = 1,
@@ -199,9 +200,9 @@ class ReplicaManagerQuotasTest {
     // Set up DelayedFetch where there is data to return to a consumer, either for the current segment or an older segment
     def setupDelayedFetch(isFetchFromOlderSegment: Boolean): DelayedFetch = {
       val endOffsetMetadata = if (isFetchFromOlderSegment)
-        LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+        new LogOffsetMetadata(100L, 0L, 500)
       else
-        LogOffsetMetadata(messageOffset = 150L, segmentBaseOffset = 50L, relativePositionInSegment = 500)
+        new LogOffsetMetadata(150L, 50L, 500)
       val partition: Partition = mock(classOf[Partition])
 
       val offsetSnapshot = LogOffsetSnapshot(
@@ -217,8 +218,9 @@ class ReplicaManagerQuotasTest {
         .thenReturn(partition)
 
       val tidp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0))
-      val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
-        relativePositionInSegment = 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
+      val fetchPartitionStatus = FetchPartitionStatus(
+        new LogOffsetMetadata(50L, 0L, 250),
+        new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
       val fetchParams = FetchParams(
         requestVersion = ApiKeys.FETCH.latestVersion,
         replicaId = FetchRequest.CONSUMER_REPLICA_ID,
@@ -254,7 +256,7 @@ class ReplicaManagerQuotasTest {
     when(log.logEndOffset).thenReturn(20L)
     when(log.highWatermark).thenReturn(5)
     when(log.lastStableOffset).thenReturn(5)
-    when(log.logEndOffsetMetadata).thenReturn(LogOffsetMetadata(20L))
+    when(log.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(20L))
     when(log.topicId).thenReturn(Some(topicId))
 
     //if we ask for len 1 return a message
@@ -263,7 +265,7 @@ class ReplicaManagerQuotasTest {
       isolation = any[FetchIsolation],
       minOneMessage = anyBoolean)).thenReturn(
       FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
+        new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, record)
       ))
 
@@ -273,7 +275,7 @@ class ReplicaManagerQuotasTest {
       isolation = any[FetchIsolation],
       minOneMessage = anyBoolean)).thenReturn(
       FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
+        new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.EMPTY
       ))
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 8c63350e396..a5e64a60cfe 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -58,7 +58,7 @@ import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, C
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
-import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
+import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, LogOffsetMetadata}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -2007,7 +2007,7 @@ class ReplicaManagerTest {
       override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
 
       override def logEndOffsetMetadata: LogOffsetMetadata =
-        localLogOffset.map(LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
+        localLogOffset.map(new LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
 
       override def logEndOffset: Long = localLogOffset.getOrElse(super.logEndOffset)
     }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 75da931a658..f86b942c3c8 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -27,7 +27,6 @@ import kafka.log.LogConfig;
 import kafka.log.LogManager;
 import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerTopicStats;
-import kafka.server.LogOffsetMetadata;
 import kafka.server.MetadataCache;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.checkpoints.OffsetCheckpoints;
@@ -39,6 +38,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrParti
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.log.internals.LogDirFailureChannel;
+import org.apache.kafka.server.log.internals.LogOffsetMetadata;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java
new file mode 100644
index 00000000000..dd6124025d8
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public final class LogOffsetMetadata {
+
+    //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module
+    private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+    public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0);
+
+    private static final int UNKNOWN_FILE_POSITION = -1;
+
+    public final long messageOffset;
+    public final long segmentBaseOffset;
+    public final int relativePositionInSegment;
+
+    public LogOffsetMetadata(long messageOffset) {
+        this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
+    }
+
+    public LogOffsetMetadata(long messageOffset,
+                             long segmentBaseOffset,
+                             int relativePositionInSegment) {
+        this.messageOffset = messageOffset;
+        this.segmentBaseOffset = segmentBaseOffset;
+        this.relativePositionInSegment = relativePositionInSegment;
+    }
+
+    // check if this offset is already on an older segment compared with the given offset
+    public boolean onOlderSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset < that.segmentBaseOffset;
+    }
+
+    // check if this offset is on the same segment with the given offset
+    private boolean onSameSegment(LogOffsetMetadata that) {
+        return this.segmentBaseOffset == that.segmentBaseOffset;
+    }
+
+    // compute the number of bytes between this offset to the given offset
+    // if they are on the same segment and this offset precedes the given offset
+    public int positionDiff(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment position with " + that + " since it only has message offset info");
+        if (!onSameSegment(that))
+            throw new KafkaException(this + " cannot compare its segment position with " + that + " since they are not on the same segment");
+
+        return this.relativePositionInSegment - that.relativePositionInSegment;
+    }
+
+    // decide if the offset metadata only contains message offset info
+    public boolean messageOffsetOnly() {
+        return segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET && relativePositionInSegment == UNKNOWN_FILE_POSITION;
+    }
+
+    @Override
+    public String toString() {
+        return "(offset=" + messageOffset + "segment=[" + segmentBaseOffset + ":" + relativePositionInSegment + "])";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        LogOffsetMetadata that = (LogOffsetMetadata) o;
+        return messageOffset == that.messageOffset
+                && segmentBaseOffset == that.segmentBaseOffset
+                && relativePositionInSegment == that.relativePositionInSegment;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Long.hashCode(messageOffset);
+        result = 31 * result + Long.hashCode(segmentBaseOffset);
+        result = 31 * result + Integer.hashCode(relativePositionInSegment);
+        return result;
+    }
+}