You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2023/02/24 01:29:41 UTC

[kafka] branch trunk updated: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values (#13268)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f559452708 MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values (#13268)
9f559452708 is described below

commit 9f55945270876ff9cc71472f223ff266d5400206
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Thu Feb 23 17:29:32 2023 -0800

    MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values (#13268)
    
    Reviewers: Satish Duggana <sa...@apache.org>, Alexandre Dupriez <al...@gmail.com>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  6 +--
 .../scala/kafka/server/AbstractFetcherThread.scala | 30 ++++++------
 .../main/scala/kafka/server/LeaderEndPoint.scala   | 13 ++---
 .../scala/kafka/server/LocalLeaderEndPoint.scala   | 13 ++---
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  | 14 +++---
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  1 +
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  1 +
 .../kafka/server/LocalLeaderEndPointTest.scala     | 15 +++---
 .../kafka/server/RemoteLeaderEndPointTest.scala    |  8 ++--
 .../kafka/server/AbstractFetcherManagerTest.scala  |  9 ++--
 .../kafka/server/AbstractFetcherThreadTest.scala   | 21 +++++----
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 14 +++---
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 30 ++++++------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  3 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  3 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  7 ++-
 .../apache/kafka/server/common/OffsetAndEpoch.java | 55 ++++++++++++++++++++++
 17 files changed, 154 insertions(+), 89 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index dfc2e2123da..f228053e492 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
+import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile, RequestLocal}
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
@@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
 import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.record.BrokerCompressionType
@@ -917,7 +917,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       if (foundOffset == UNDEFINED_EPOCH_OFFSET)
         None
       else
-        Some(OffsetAndEpoch(foundOffset, foundEpoch))
+        Some(new OffsetAndEpoch(foundOffset, foundEpoch))
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 066463e307c..25f2e292cc5 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 
@@ -613,7 +614,9 @@ abstract class AbstractFetcherThread(name: String,
       // get (leader epoch, end offset) pair that corresponds to the largest leader epoch
       // less than or equal to the requested epoch.
       endOffsetForEpoch(tp, leaderEpochOffset.leaderEpoch) match {
-        case Some(OffsetAndEpoch(followerEndOffset, followerEpoch)) =>
+        case Some(offsetAndEpoch) =>
+          val followerEndOffset = offsetAndEpoch.offset
+          val followerEpoch = offsetAndEpoch.leaderEpoch
           if (followerEpoch != leaderEpochOffset.leaderEpoch) {
             // the follower does not know about the epoch that leader replied with
             // we truncate to the end offset of the largest epoch that is smaller than the
@@ -658,7 +661,7 @@ abstract class AbstractFetcherThread(name: String,
   private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition,
                                                   topicId: Option[Uuid],
                                                   currentLeaderEpoch: Int,
-                                                  truncateAndBuild: => (Int, Long) => Long,
+                                                  truncateAndBuild: => OffsetAndEpoch => Long,
                                                   fetchFromLocalLogStartOffset: Boolean = true): PartitionFetchState = {
     val replicaEndOffset = logEndOffset(topicPartition)
 
@@ -672,7 +675,8 @@ abstract class AbstractFetcherThread(name: String,
      *
      * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
      */
-    val (_, leaderEndOffset) = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
+    val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
+    val leaderEndOffset = offsetAndEpoch.offset
     if (leaderEndOffset < replicaEndOffset) {
       warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
         s"leader's latest offset $leaderEndOffset")
@@ -704,10 +708,10 @@ abstract class AbstractFetcherThread(name: String,
        * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
        * and the current leader's (local-log-start-offset or) log start offset.
        */
-      val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
+      val offsetAndEpoch = if (fetchFromLocalLogStartOffset)
         leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
         leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+      val leaderStartOffset = offsetAndEpoch.offset
       warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
         s"leader's start offset $leaderStartOffset")
       val offsetToFetch =
@@ -715,7 +719,7 @@ abstract class AbstractFetcherThread(name: String,
           // Only truncate log when current leader's log start offset (local log start offset if >= 3.4 version incaseof
           // OffsetMovedToTieredStorage error) is greater than follower's log end offset.
           // truncateAndBuild returns offset value from which it needs to start fetching.
-          truncateAndBuild(epoch, leaderStartOffset)
+          truncateAndBuild(offsetAndEpoch)
         } else {
           replicaEndOffset
         }
@@ -732,7 +736,8 @@ abstract class AbstractFetcherThread(name: String,
    */
   private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
     fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch,
-      (_, leaderLogStartOffset) => {
+      offsetAndEpoch => {
+        val leaderLogStartOffset = offsetAndEpoch.offset
         truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
         leaderLogStartOffset
       },
@@ -803,7 +808,10 @@ abstract class AbstractFetcherThread(name: String,
                                                 leaderLogStartOffset: Long): Boolean = {
     try {
       val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch,
-        (offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))
+        offsetAndEpoch => {
+          val leaderLocalLogStartOffset = offsetAndEpoch.offset
+          buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetAndEpoch.leaderEpoch(), leaderLogStartOffset)
+        })
 
       partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
       debug(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
@@ -1025,9 +1033,3 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {
 
   override def toString: String = s"TruncationState(offset=$offset, completed=$truncationCompleted)"
 }
-
-case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) {
-  override def toString: String = {
-    s"(offset=$offset, leaderEpoch=$leaderEpoch)"
-  }
-}
diff --git a/core/src/main/scala/kafka/server/LeaderEndPoint.scala b/core/src/main/scala/kafka/server/LeaderEndPoint.scala
index 3deff7d7b79..e931e0bbf3e 100644
--- a/core/src/main/scala/kafka/server/LeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LeaderEndPoint.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
+import org.apache.kafka.server.common.OffsetAndEpoch
 
 import scala.collection.Map
 
@@ -71,9 +72,9 @@ trait LeaderEndPoint {
    * @param topicPartition The topic partition that we want to fetch from
    * @param currentLeaderEpoch An int representing the current leader epoch of the requester
    *
-   * @return A tuple representing the (epoch, earliest_offset) in the leader's topic partition.
+   * @return An OffsetAndEpoch object representing the earliest offset and epoch in the leader's topic partition.
    */
-  def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
+  def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
 
   /**
    * Fetches the epoch and log end offset of the given topic partition from the leader.
@@ -81,9 +82,9 @@ trait LeaderEndPoint {
    * @param topicPartition The topic partition that we want to fetch from
    * @param currentLeaderEpoch An int representing the current leader epoch of the requester
    *
-   * @return A tuple representing the (epoch, latest_offset) in the leader's topic partition.
+   * @return An OffsetAndEpoch object representing the latest offset and epoch in the leader's topic partition.
    */
-  def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
+  def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
 
   /**
    * Fetches offset for leader epoch from the leader for each given topic partition
@@ -100,9 +101,9 @@ trait LeaderEndPoint {
    * @param topicPartition  The topic partition that we want to fetch from
    * @param currentLeaderEpoch An int representing the current leader epoch of the requester
    *
-   * @return A tuple representing the (epoch, earliest_local_offset) in the leader's topic partition.
+   * @return An OffsetAndEpoch object representing the earliest local offset and epoch in the leader's topic partition.
    */
-  def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
+  def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
 
   /**
    * Builds a fetch request, given a partition map.
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 011a9c41e3f..587b6a449ed 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
 
 import java.util
@@ -113,25 +114,25 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     partitionData.toMap
   }
 
-  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
     val partition = replicaManager.getPartitionOrException(topicPartition)
     val logStartOffset = partition.localLogOrException.logStartOffset
     val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logStartOffset)
-    (epoch.orElse(0), logStartOffset)
+    new OffsetAndEpoch(logStartOffset, epoch.orElse(0))
   }
 
-  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
     val partition = replicaManager.getPartitionOrException(topicPartition)
     val logEndOffset = partition.localLogOrException.logEndOffset
     val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logEndOffset)
-    (epoch.orElse(0), logEndOffset)
+    new OffsetAndEpoch(logEndOffset, epoch.orElse(0))
   }
 
-  override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+  override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
     val partition = replicaManager.getPartitionOrException(topicPartition)
     val localLogStartOffset = partition.localLogOrException.localLogStartOffset()
     val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset)
-    (epoch.orElse(0), localLogStartOffset)
+    new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
   }
 
   override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index 9c455324a17..ed8e775f330 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
 
 import scala.jdk.CollectionConverters._
@@ -94,19 +94,19 @@ class RemoteLeaderEndPoint(logPrefix: String,
     }
   }
 
-  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
     fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP)
   }
 
-  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
     fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP)
   }
 
-  override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+  override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
     fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
   }
 
-  private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): (Int, Long) = {
+  private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = {
     val topic = new ListOffsetsTopic()
       .setName(topicPartition.topic)
       .setPartitions(Collections.singletonList(
@@ -126,9 +126,9 @@ class RemoteLeaderEndPoint(logPrefix: String,
     Errors.forCode(responsePartition.errorCode) match {
       case Errors.NONE =>
         if (metadataVersion.isAtLeast(IBP_0_10_1_IV2))
-          (responsePartition.leaderEpoch, responsePartition.offset)
+          new OffsetAndEpoch(responsePartition.offset, responsePartition.leaderEpoch)
         else
-          (responsePartition.leaderEpoch, responsePartition.oldStyleOffsets.get(0))
+          new OffsetAndEpoch(responsePartition.oldStyleOffsets.get(0), responsePartition.leaderEpoch)
       case error => throw error.exception
     }
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 86bb227ba40..e003ae1c76f 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -20,6 +20,7 @@ package kafka.server
 import kafka.log.LeaderOffsetIncremented
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.FetchResponse
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 
 import scala.collection.{Map, Set}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index bbbf2d4890b..4a653c43552 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.server.common.CheckpointFile.CheckpointReadBuffer
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageException, RemoteStorageManager}
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index acf8d02349a..1c064fc991a 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.util.MockScheduler
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel}
 import org.junit.jupiter.api.{BeforeEach, Test}
@@ -83,41 +84,41 @@ class LocalLeaderEndPointTest {
   def testFetchLatestOffset(): Unit = {
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
-    assertEquals((0, 3L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
+    assertEquals(new OffsetAndEpoch(3L, 0), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
     val leaderAndIsrRequest =  buildLeaderAndIsrRequest(leaderEpoch = 4)
     replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
-    assertEquals((4, 6L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals(new OffsetAndEpoch(6L, 4), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
   }
 
   @Test
   def testFetchEarliestOffset(): Unit = {
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
-    assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
+    assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
 
     val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
     replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
     replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ())
-    assertEquals((4, 3L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
   }
 
   @Test
   def testFetchEarliestLocalOffset(): Unit = {
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
-    assertEquals((0, 0L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
+    assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
 
     val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
     replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
     replicaManager.logManager.getLog(topicPartition).foreach(log => log._localLogStartOffset = 3)
-    assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
-    assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
   }
 
   @Test
diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
index 4e496b2607c..5232f60b53d 100644
--- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.Mockito.mock
@@ -65,21 +65,21 @@ class RemoteLeaderEndPointTest {
     def testFetchLatestOffset(): Unit = {
         blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
           new ListOffsetsPartitionResponse().setLeaderEpoch(7).setOffset(logEndOffset)))
-        assertEquals((7, logEndOffset), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch))
+        assertEquals(new OffsetAndEpoch(logEndOffset, 7), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch))
     }
 
     @Test
     def testFetchEarliestOffset(): Unit = {
         blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
           new ListOffsetsPartitionResponse().setLeaderEpoch(5).setOffset(logStartOffset)))
-        assertEquals((5, logStartOffset), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch))
+        assertEquals(new OffsetAndEpoch(logStartOffset, 5), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch))
     }
 
     @Test
     def testFetchEarliestLocalOffset(): Unit = {
         blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
           new ListOffsetsPartitionResponse().setLeaderEpoch(6).setOffset(localLogStartOffset)))
-        assertEquals((6, localLogStartOffset), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
+        assertEquals(new OffsetAndEpoch(localLogStartOffset, 6), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
     }
 
     @Test
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index a3563c01db4..25bf9633438 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 import org.junit.jupiter.api.Assertions._
@@ -297,9 +298,9 @@ class AbstractFetcherManagerTest {
 
     override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = Map.empty
 
-    override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
+    override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
 
-    override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
+    override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
 
     override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = Map.empty
 
@@ -307,7 +308,7 @@ class AbstractFetcherManagerTest {
 
     override val isTruncationOnFetchSupported: Boolean = false
 
-    override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
+    override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
   }
 
   private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions)
@@ -333,7 +334,7 @@ class AbstractFetcherManagerTest {
 
     override protected def logEndOffset(topicPartition: TopicPartition): Long = 1
 
-    override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(OffsetAndEpoch(1, 0))
+    override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(new OffsetAndEpoch(1, 0))
 
     override protected val isOffsetForLeaderEpochSupported: Boolean = false
 
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 098342b9416..9c8d5323c97 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata}
@@ -704,11 +705,11 @@ class AbstractFetcherThreadTest {
     var fetchedEarliestOffset = false
 
     val fetcher = new MockFetcherThread(new MockLeaderEndPoint {
-      override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+      override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
         fetchedEarliestOffset = true
         throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
       }
-      override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+      override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
         fetchedEarliestOffset = true
         throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
       }
@@ -780,7 +781,7 @@ class AbstractFetcherThreadTest {
     val partition = new TopicPartition("topic", 0)
     val fetcher: MockFetcherThread = new MockFetcherThread(new MockLeaderEndPoint {
       val tries = new AtomicInteger(0)
-      override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+      override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
         if (tries.getAndIncrement() == 0)
           throw new UnknownLeaderEpochException("Unexpected leader epoch")
         super.fetchLatestOffset(topicPartition, leaderEpoch)
@@ -1265,22 +1266,22 @@ class AbstractFetcherThreadTest {
       }.toMap
     }
 
-    override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+    override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
       val leaderState = leaderPartitionState(topicPartition)
       checkLeaderEpochAndThrow(leaderEpoch, leaderState)
-      (leaderState.leaderEpoch, leaderState.logStartOffset)
+      new OffsetAndEpoch(leaderState.logStartOffset, leaderState.leaderEpoch)
     }
 
-    override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+    override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
       val leaderState = leaderPartitionState(topicPartition)
       checkLeaderEpochAndThrow(leaderEpoch, leaderState)
-      (leaderState.leaderEpoch, leaderState.logEndOffset)
+      new OffsetAndEpoch(leaderState.logEndOffset, leaderState.leaderEpoch)
     }
 
-    override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+    override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
       val leaderState = leaderPartitionState(topicPartition)
       checkLeaderEpochAndThrow(leaderEpoch, leaderState)
-      (leaderState.leaderEpoch, leaderState.localLogStartOffset)
+      new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch)
     }
 
     override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
@@ -1542,7 +1543,7 @@ class AbstractFetcherThreadTest {
       if (result.endOffset == UNDEFINED_EPOCH_OFFSET)
         None
       else
-        Some(OffsetAndEpoch(result.endOffset, result.leaderEpoch))
+        Some(new OffsetAndEpoch(result.endOffset, result.leaderEpoch))
     }
 
     def verifyLastFetchedEpoch(partition: TopicPartition, expectedEpoch: Option[Int]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 5aea1328ce6..582cd47ee73 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
 import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -477,7 +477,7 @@ class ReplicaAlterLogDirsThreadTest {
 
     when(futureLogT1p0.latestEpoch).thenReturn(Some(leaderEpoch))
     when(futureLogT1p0.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
+      Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
     when(partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
       .thenReturn(new EpochEndOffset()
         .setPartition(partitionT1p0Id)
@@ -487,7 +487,7 @@ class ReplicaAlterLogDirsThreadTest {
 
     when(futureLogT1p1.latestEpoch).thenReturn(Some(leaderEpoch))
     when(futureLogT1p1.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
+      Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
     when(partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
       .thenReturn(new EpochEndOffset()
         .setPartition(partitionT1p1Id)
@@ -568,7 +568,7 @@ class ReplicaAlterLogDirsThreadTest {
         .setEndOffset(replicaLEO))
     // but future replica does not know about this leader epoch, so returns a smaller leader epoch
     when(futureLog.endOffsetForEpoch(leaderEpoch - 1)).thenReturn(
-      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2)))
+      Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2)))
     // finally, the leader replica knows about the leader epoch and returns end offset
     when(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch - 2, fetchOnlyFromLeader = false))
       .thenReturn(new EpochEndOffset()
@@ -577,7 +577,7 @@ class ReplicaAlterLogDirsThreadTest {
         .setLeaderEpoch(leaderEpoch - 2)
         .setEndOffset(replicaEpochEndOffset))
     when(futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn(
-      Some(OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
+      Some(new OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
 
     when(replicaManager.logManager).thenReturn(logManager)
     stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
@@ -693,7 +693,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(futureLog.logEndOffset).thenReturn(futureReplicaLEO)
     when(futureLog.latestEpoch).thenReturn(Some(futureReplicaLeaderEpoch))
     when(futureLog.endOffsetForEpoch(futureReplicaLeaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
+      Some(new OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
     when(replicaManager.localLog(t1p0)).thenReturn(Some(log))
 
     // this will cause fetchEpochsFromLeader return an error with undefined offset
@@ -786,7 +786,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(futureLog.latestEpoch).thenReturn(Some(leaderEpoch))
     when(futureLog.logEndOffset).thenReturn(futureReplicaLEO)
     when(futureLog.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
+      Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
     when(replicaManager.logManager).thenReturn(logManager)
     stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 2f7a106e229..500bc23447f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.utils.{LogContext, SystemTime}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 import org.junit.jupiter.api.Assertions._
@@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest {
       .thenReturn(Some(leaderEpoch))
       .thenReturn(None)  // t2p1 doesn't support epochs
     when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(0, leaderEpoch)))
+      Some(new OffsetAndEpoch(0, leaderEpoch)))
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.logManager).thenReturn(logManager)
     when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@@ -303,7 +303,7 @@ class ReplicaFetcherThreadTest {
     when(log.highWatermark).thenReturn(0)
     when(log.latestEpoch).thenReturn(Some(leaderEpoch))
     when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(0, leaderEpoch)))
+      Some(new OffsetAndEpoch(0, leaderEpoch)))
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.logManager).thenReturn(logManager)
     when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@@ -367,7 +367,7 @@ class ReplicaFetcherThreadTest {
     when(log.highWatermark).thenReturn(initialLEO - 1)
     when(log.latestEpoch).thenReturn(Some(leaderEpoch))
     when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(initialLEO, leaderEpoch)))
+      Some(new OffsetAndEpoch(initialLEO, leaderEpoch)))
     when(log.logEndOffset).thenReturn(initialLEO)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -488,9 +488,9 @@ class ReplicaFetcherThreadTest {
     when(log.highWatermark).thenReturn(initialLEO - 2)
     when(log.latestEpoch).thenReturn(Some(5))
     when(log.endOffsetForEpoch(4)).thenReturn(
-      Some(OffsetAndEpoch(120, 3)))
+      Some(new OffsetAndEpoch(120, 3)))
     when(log.endOffsetForEpoch(3)).thenReturn(
-      Some(OffsetAndEpoch(120, 3)))
+      Some(new OffsetAndEpoch(120, 3)))
     when(log.logEndOffset).thenReturn(initialLEO)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -571,9 +571,9 @@ class ReplicaFetcherThreadTest {
     when(partition.localLogOrException).thenReturn(log)
     when(log.highWatermark).thenReturn(115)
     when(log.latestEpoch).thenAnswer(_ => latestLogEpoch)
-    when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
-    when(log.endOffsetForEpoch(3)).thenReturn(Some(OffsetAndEpoch(129, 2)))
-    when(log.endOffsetForEpoch(2)).thenReturn(Some(OffsetAndEpoch(119, 1)))
+    when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4)))
+    when(log.endOffsetForEpoch(3)).thenReturn(Some(new OffsetAndEpoch(129, 2)))
+    when(log.endOffsetForEpoch(2)).thenReturn(Some(new OffsetAndEpoch(119, 1)))
     when(log.logEndOffset).thenReturn(initialLEO)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -674,7 +674,7 @@ class ReplicaFetcherThreadTest {
 
     when(log.highWatermark).thenReturn(highWatermark)
     when(log.latestEpoch).thenReturn(Some(5))
-    when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
+    when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4)))
     when(log.logEndOffset).thenReturn(logEndOffset)
 
     when(replicaManager.metadataCache).thenReturn(metadataCache)
@@ -766,9 +766,9 @@ class ReplicaFetcherThreadTest {
     when(log.highWatermark).thenReturn(initialLEO - 2)
     when(log.latestEpoch).thenReturn(Some(5))
     when(log.endOffsetForEpoch(4)).thenReturn(
-      Some(OffsetAndEpoch(120, 3)))
+      Some(new OffsetAndEpoch(120, 3)))
     when(log.endOffsetForEpoch(3)).thenReturn(
-      Some(OffsetAndEpoch(120, 3)))
+      Some(new OffsetAndEpoch(120, 3)))
     when(log.logEndOffset).thenReturn(initialLEO)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -893,7 +893,7 @@ class ReplicaFetcherThreadTest {
     when(log.latestEpoch).thenReturn(Some(leaderEpoch))
     // this is for the last reply with EpochEndOffset(5, 156)
     when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(initialLeo, leaderEpoch)))
+      Some(new OffsetAndEpoch(initialLeo, leaderEpoch)))
     when(log.logEndOffset).thenReturn(initialLeo)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -958,7 +958,7 @@ class ReplicaFetcherThreadTest {
     when(log.highWatermark).thenReturn(0)
     when(log.latestEpoch).thenReturn(Some(leaderEpoch))
     when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
-      Some(OffsetAndEpoch(0, leaderEpoch)))
+      Some(new OffsetAndEpoch(0, leaderEpoch)))
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.logManager).thenReturn(logManager)
     when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@@ -1016,7 +1016,7 @@ class ReplicaFetcherThreadTest {
     when(partition.localLogOrException).thenReturn(log)
     when(log.highWatermark).thenReturn(initialLEO - 2)
     when(log.latestEpoch).thenReturn(Some(5))
-    when(log.endOffsetForEpoch(5)).thenReturn(Some(OffsetAndEpoch(initialLEO, 5)))
+    when(log.endOffsetForEpoch(5)).thenReturn(Some(new OffsetAndEpoch(initialLEO, 5)))
     when(log.logEndOffset).thenReturn(initialLEO)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5602acc7985..f8d3ae3d490 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -57,6 +57,7 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPar
 import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.util.MockScheduler
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig}
@@ -2059,7 +2060,7 @@ class ReplicaManagerTest {
       override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
         assertEquals(leaderEpoch, leaderEpochFromLeader)
         localLogOffset.map { logOffset =>
-          Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader))
+          Some(new OffsetAndEpoch(logOffset, leaderEpochFromLeader))
         }.getOrElse(super.endOffsetForEpoch(leaderEpoch))
       }
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index e5460d09d9c..ad5a7cee637 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -53,7 +54,7 @@ class OffsetsForLeaderEpochTest {
   @Test
   def shouldGetEpochsFromReplica(): Unit = {
     //Given
-    val offsetAndEpoch = OffsetAndEpoch(42L, 5)
+    val offsetAndEpoch = new OffsetAndEpoch(42L, 5)
     val epochRequested: Integer = 5
     val request = Seq(newOffsetForLeaderTopic(tp, RecordBatch.NO_PARTITION_LEADER_EPOCH, epochRequested))
 
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 04008b1d0c5..e37a197655c 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -30,7 +30,6 @@ import kafka.server.FailedPartitions;
 import kafka.server.InitialFetchState;
 import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
-import kafka.server.OffsetAndEpoch;
 import kafka.server.OffsetTruncationState;
 import kafka.server.QuotaFactory;
 import kafka.server.RemoteLeaderEndPoint;
@@ -67,6 +66,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.storage.internals.log.LogConfig;
@@ -99,7 +99,6 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import scala.Option;
-import scala.Tuple2;
 import scala.collection.Iterator;
 import scala.collection.Map;
 
@@ -318,8 +317,8 @@ public class ReplicaFetcherThreadBenchmark {
                             config::interBrokerProtocolVersion
                     ) {
                         @Override
-                        public Tuple2<Object, Object> fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
-                            return Tuple2.apply(0, 0);
+                        public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
+                            return new OffsetAndEpoch(0L, 0);
                         }
 
                         @Override
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java
new file mode 100644
index 00000000000..a5953ae70bc
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+public class OffsetAndEpoch {
+    private final long offset;
+    private final int leaderEpoch;
+
+    public OffsetAndEpoch(long offset, int leaderEpoch) {
+        this.offset = offset;
+        this.leaderEpoch = leaderEpoch;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public int leaderEpoch() {
+        return leaderEpoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        OffsetAndEpoch that = (OffsetAndEpoch) o;
+        return offset == that.offset && leaderEpoch == that.leaderEpoch;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = leaderEpoch;
+        result = 31 * result + Long.hashCode(offset);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(offset=" + offset + ", leaderEpoch=" + leaderEpoch + ")";
+    }
+}