You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2023/02/24 01:29:41 UTC
[kafka] branch trunk updated: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values (#13268)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f559452708 MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values (#13268)
9f559452708 is described below
commit 9f55945270876ff9cc71472f223ff266d5400206
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Thu Feb 23 17:29:32 2023 -0800
MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values (#13268)
Reviewers: Satish Duggana <sa...@apache.org>, Alexandre Dupriez <al...@gmail.com>, Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 6 +--
.../scala/kafka/server/AbstractFetcherThread.scala | 30 ++++++------
.../main/scala/kafka/server/LeaderEndPoint.scala | 13 ++---
.../scala/kafka/server/LocalLeaderEndPoint.scala | 13 ++---
.../scala/kafka/server/RemoteLeaderEndPoint.scala | 14 +++---
.../kafka/server/ReplicaAlterLogDirsThread.scala | 1 +
.../scala/kafka/server/ReplicaFetcherThread.scala | 1 +
.../kafka/server/LocalLeaderEndPointTest.scala | 15 +++---
.../kafka/server/RemoteLeaderEndPointTest.scala | 8 ++--
.../kafka/server/AbstractFetcherManagerTest.scala | 9 ++--
.../kafka/server/AbstractFetcherThreadTest.scala | 21 +++++----
.../server/ReplicaAlterLogDirsThreadTest.scala | 14 +++---
.../kafka/server/ReplicaFetcherThreadTest.scala | 30 ++++++------
.../unit/kafka/server/ReplicaManagerTest.scala | 3 +-
.../server/epoch/OffsetsForLeaderEpochTest.scala | 3 +-
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 7 ++-
.../apache/kafka/server/common/OffsetAndEpoch.java | 55 ++++++++++++++++++++++
17 files changed, 154 insertions(+), 89 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index dfc2e2123da..f228053e492 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
+import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
@@ -917,7 +917,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (foundOffset == UNDEFINED_EPOCH_OFFSET)
None
else
- Some(OffsetAndEpoch(foundOffset, foundEpoch))
+ Some(new OffsetAndEpoch(foundOffset, foundEpoch))
}
}
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 066463e307c..25f2e292cc5 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -613,7 +614,9 @@ abstract class AbstractFetcherThread(name: String,
// get (leader epoch, end offset) pair that corresponds to the largest leader epoch
// less than or equal to the requested epoch.
endOffsetForEpoch(tp, leaderEpochOffset.leaderEpoch) match {
- case Some(OffsetAndEpoch(followerEndOffset, followerEpoch)) =>
+ case Some(offsetAndEpoch) =>
+ val followerEndOffset = offsetAndEpoch.offset
+ val followerEpoch = offsetAndEpoch.leaderEpoch
if (followerEpoch != leaderEpochOffset.leaderEpoch) {
// the follower does not know about the epoch that leader replied with
// we truncate to the end offset of the largest epoch that is smaller than the
@@ -658,7 +661,7 @@ abstract class AbstractFetcherThread(name: String,
private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition,
topicId: Option[Uuid],
currentLeaderEpoch: Int,
- truncateAndBuild: => (Int, Long) => Long,
+ truncateAndBuild: => OffsetAndEpoch => Long,
fetchFromLocalLogStartOffset: Boolean = true): PartitionFetchState = {
val replicaEndOffset = logEndOffset(topicPartition)
@@ -672,7 +675,8 @@ abstract class AbstractFetcherThread(name: String,
*
* There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
*/
- val (_, leaderEndOffset) = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
+ val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
+ val leaderEndOffset = offsetAndEpoch.offset
if (leaderEndOffset < replicaEndOffset) {
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset")
@@ -704,10 +708,10 @@ abstract class AbstractFetcherThread(name: String,
* Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
* and the current leader's (local-log-start-offset or) log start offset.
*/
- val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
+ val offsetAndEpoch = if (fetchFromLocalLogStartOffset)
leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+ val leaderStartOffset = offsetAndEpoch.offset
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's start offset $leaderStartOffset")
val offsetToFetch =
@@ -715,7 +719,7 @@ abstract class AbstractFetcherThread(name: String,
// Only truncate log when current leader's log start offset (local log start offset if >= 3.4 version incaseof
// OffsetMovedToTieredStorage error) is greater than follower's log end offset.
// truncateAndBuild returns offset value from which it needs to start fetching.
- truncateAndBuild(epoch, leaderStartOffset)
+ truncateAndBuild(offsetAndEpoch)
} else {
replicaEndOffset
}
@@ -732,7 +736,8 @@ abstract class AbstractFetcherThread(name: String,
*/
private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch,
- (_, leaderLogStartOffset) => {
+ offsetAndEpoch => {
+ val leaderLogStartOffset = offsetAndEpoch.offset
truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
leaderLogStartOffset
},
@@ -803,7 +808,10 @@ abstract class AbstractFetcherThread(name: String,
leaderLogStartOffset: Long): Boolean = {
try {
val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch,
- (offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))
+ offsetAndEpoch => {
+ val leaderLocalLogStartOffset = offsetAndEpoch.offset
+ buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetAndEpoch.leaderEpoch(), leaderLogStartOffset)
+ })
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
debug(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
@@ -1025,9 +1033,3 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {
override def toString: String = s"TruncationState(offset=$offset, completed=$truncationCompleted)"
}
-
-case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) {
- override def toString: String = {
- s"(offset=$offset, leaderEpoch=$leaderEpoch)"
- }
-}
diff --git a/core/src/main/scala/kafka/server/LeaderEndPoint.scala b/core/src/main/scala/kafka/server/LeaderEndPoint.scala
index 3deff7d7b79..e931e0bbf3e 100644
--- a/core/src/main/scala/kafka/server/LeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LeaderEndPoint.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
+import org.apache.kafka.server.common.OffsetAndEpoch
import scala.collection.Map
@@ -71,9 +72,9 @@ trait LeaderEndPoint {
* @param topicPartition The topic partition that we want to fetch from
* @param currentLeaderEpoch An int representing the current leader epoch of the requester
*
- * @return A tuple representing the (epoch, earliest_offset) in the leader's topic partition.
+ * @return An OffsetAndEpoch object representing the earliest offset and epoch in the leader's topic partition.
*/
- def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
+ def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
/**
* Fetches the epoch and log end offset of the given topic partition from the leader.
@@ -81,9 +82,9 @@ trait LeaderEndPoint {
* @param topicPartition The topic partition that we want to fetch from
* @param currentLeaderEpoch An int representing the current leader epoch of the requester
*
- * @return A tuple representing the (epoch, latest_offset) in the leader's topic partition.
+ * @return An OffsetAndEpoch object representing the latest offset and epoch in the leader's topic partition.
*/
- def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
+ def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
/**
* Fetches offset for leader epoch from the leader for each given topic partition
@@ -100,9 +101,9 @@ trait LeaderEndPoint {
* @param topicPartition The topic partition that we want to fetch from
* @param currentLeaderEpoch An int representing the current leader epoch of the requester
*
- * @return A tuple representing the (epoch, earliest_local_offset) in the leader's topic partition.
+ * @return An OffsetAndEpoch object representing the earliest local offset and epoch in the leader's topic partition.
*/
- def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
+ def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
/**
* Builds a fetch request, given a partition map.
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 011a9c41e3f..587b6a449ed 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
import java.util
@@ -113,25 +114,25 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
partitionData.toMap
}
- override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val logStartOffset = partition.localLogOrException.logStartOffset
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logStartOffset)
- (epoch.orElse(0), logStartOffset)
+ new OffsetAndEpoch(logStartOffset, epoch.orElse(0))
}
- override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+ override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val logEndOffset = partition.localLogOrException.logEndOffset
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logEndOffset)
- (epoch.orElse(0), logEndOffset)
+ new OffsetAndEpoch(logEndOffset, epoch.orElse(0))
}
- override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val localLogStartOffset = partition.localLogOrException.localLogStartOffset()
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset)
- (epoch.orElse(0), localLogStartOffset)
+ new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
}
override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index 9c455324a17..ed8e775f330 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
import scala.jdk.CollectionConverters._
@@ -94,19 +94,19 @@ class RemoteLeaderEndPoint(logPrefix: String,
}
}
- override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP)
}
- override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+ override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP)
}
- override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
}
- private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): (Int, Long) = {
+ private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = {
val topic = new ListOffsetsTopic()
.setName(topicPartition.topic)
.setPartitions(Collections.singletonList(
@@ -126,9 +126,9 @@ class RemoteLeaderEndPoint(logPrefix: String,
Errors.forCode(responsePartition.errorCode) match {
case Errors.NONE =>
if (metadataVersion.isAtLeast(IBP_0_10_1_IV2))
- (responsePartition.leaderEpoch, responsePartition.offset)
+ new OffsetAndEpoch(responsePartition.offset, responsePartition.leaderEpoch)
else
- (responsePartition.leaderEpoch, responsePartition.oldStyleOffsets.get(0))
+ new OffsetAndEpoch(responsePartition.oldStyleOffsets.get(0), responsePartition.leaderEpoch)
case error => throw error.exception
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 86bb227ba40..e003ae1c76f 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -20,6 +20,7 @@ package kafka.server
import kafka.log.LeaderOffsetIncremented
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchResponse
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.LogAppendInfo
import scala.collection.{Map, Set}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index bbbf2d4890b..4a653c43552 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.common.CheckpointFile.CheckpointReadBuffer
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageException, RemoteStorageManager}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index acf8d02349a..1c064fc991a 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -83,41 +84,41 @@ class LocalLeaderEndPointTest {
def testFetchLatestOffset(): Unit = {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
- assertEquals((0, 3L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
+ assertEquals(new OffsetAndEpoch(3L, 0), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
- assertEquals((4, 6L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
+ assertEquals(new OffsetAndEpoch(6L, 4), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
def testFetchEarliestOffset(): Unit = {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
- assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
+ assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ())
- assertEquals((4, 3L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+ assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
def testFetchEarliestLocalOffset(): Unit = {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
- assertEquals((0, 0L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
+ assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.logManager.getLog(topicPartition).foreach(log => log._localLogStartOffset = 3)
- assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
- assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
+ assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+ assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
index 4e496b2607c..5232f60b53d 100644
--- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.Mockito.mock
@@ -65,21 +65,21 @@ class RemoteLeaderEndPointTest {
def testFetchLatestOffset(): Unit = {
blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
new ListOffsetsPartitionResponse().setLeaderEpoch(7).setOffset(logEndOffset)))
- assertEquals((7, logEndOffset), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch))
+ assertEquals(new OffsetAndEpoch(logEndOffset, 7), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch))
}
@Test
def testFetchEarliestOffset(): Unit = {
blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
new ListOffsetsPartitionResponse().setLeaderEpoch(5).setOffset(logStartOffset)))
- assertEquals((5, logStartOffset), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch))
+ assertEquals(new OffsetAndEpoch(logStartOffset, 5), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch))
}
@Test
def testFetchEarliestLocalOffset(): Unit = {
blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
new ListOffsetsPartitionResponse().setLeaderEpoch(6).setOffset(localLogStartOffset)))
- assertEquals((6, localLogStartOffset), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
+ assertEquals(new OffsetAndEpoch(localLogStartOffset, 6), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index a3563c01db4..25bf9633438 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._
@@ -297,9 +298,9 @@ class AbstractFetcherManagerTest {
override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = Map.empty
- override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
+ override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
- override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
+ override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = Map.empty
@@ -307,7 +308,7 @@ class AbstractFetcherManagerTest {
override val isTruncationOnFetchSupported: Boolean = false
- override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
+ override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
}
private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions)
@@ -333,7 +334,7 @@ class AbstractFetcherManagerTest {
override protected def logEndOffset(topicPartition: TopicPartition): Long = 1
- override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(OffsetAndEpoch(1, 0))
+ override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(new OffsetAndEpoch(1, 0))
override protected val isOffsetForLeaderEpochSupported: Boolean = false
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 098342b9416..9c8d5323c97 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata}
@@ -704,11 +705,11 @@ class AbstractFetcherThreadTest {
var fetchedEarliestOffset = false
val fetcher = new MockFetcherThread(new MockLeaderEndPoint {
- override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
fetchedEarliestOffset = true
throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
}
- override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
fetchedEarliestOffset = true
throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
}
@@ -780,7 +781,7 @@ class AbstractFetcherThreadTest {
val partition = new TopicPartition("topic", 0)
val fetcher: MockFetcherThread = new MockFetcherThread(new MockLeaderEndPoint {
val tries = new AtomicInteger(0)
- override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+ override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
if (tries.getAndIncrement() == 0)
throw new UnknownLeaderEpochException("Unexpected leader epoch")
super.fetchLatestOffset(topicPartition, leaderEpoch)
@@ -1265,22 +1266,22 @@ class AbstractFetcherThreadTest {
}.toMap
}
- override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
- (leaderState.leaderEpoch, leaderState.logStartOffset)
+ new OffsetAndEpoch(leaderState.logStartOffset, leaderState.leaderEpoch)
}
- override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+ override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
- (leaderState.leaderEpoch, leaderState.logEndOffset)
+ new OffsetAndEpoch(leaderState.logEndOffset, leaderState.leaderEpoch)
}
- override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+ override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
- (leaderState.leaderEpoch, leaderState.localLogStartOffset)
+ new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch)
}
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
@@ -1542,7 +1543,7 @@ class AbstractFetcherThreadTest {
if (result.endOffset == UNDEFINED_EPOCH_OFFSET)
None
else
- Some(OffsetAndEpoch(result.endOffset, result.leaderEpoch))
+ Some(new OffsetAndEpoch(result.endOffset, result.leaderEpoch))
}
def verifyLastFetchedEpoch(partition: TopicPartition, expectedEpoch: Option[Int]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 5aea1328ce6..582cd47ee73 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -477,7 +477,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLogT1p0.latestEpoch).thenReturn(Some(leaderEpoch))
when(futureLogT1p0.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
+ Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
when(partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset()
.setPartition(partitionT1p0Id)
@@ -487,7 +487,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLogT1p1.latestEpoch).thenReturn(Some(leaderEpoch))
when(futureLogT1p1.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
+ Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
when(partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset()
.setPartition(partitionT1p1Id)
@@ -568,7 +568,7 @@ class ReplicaAlterLogDirsThreadTest {
.setEndOffset(replicaLEO))
// but future replica does not know about this leader epoch, so returns a smaller leader epoch
when(futureLog.endOffsetForEpoch(leaderEpoch - 1)).thenReturn(
- Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2)))
+ Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2)))
// finally, the leader replica knows about the leader epoch and returns end offset
when(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch - 2, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset()
@@ -577,7 +577,7 @@ class ReplicaAlterLogDirsThreadTest {
.setLeaderEpoch(leaderEpoch - 2)
.setEndOffset(replicaEpochEndOffset))
when(futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn(
- Some(OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
+ Some(new OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
when(replicaManager.logManager).thenReturn(logManager)
stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
@@ -693,7 +693,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLog.logEndOffset).thenReturn(futureReplicaLEO)
when(futureLog.latestEpoch).thenReturn(Some(futureReplicaLeaderEpoch))
when(futureLog.endOffsetForEpoch(futureReplicaLeaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
+ Some(new OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
when(replicaManager.localLog(t1p0)).thenReturn(Some(log))
// this will cause fetchEpochsFromLeader return an error with undefined offset
@@ -786,7 +786,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLog.latestEpoch).thenReturn(Some(leaderEpoch))
when(futureLog.logEndOffset).thenReturn(futureReplicaLEO)
when(futureLog.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
+ Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
when(replicaManager.logManager).thenReturn(logManager)
stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 2f7a106e229..500bc23447f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, SystemTime}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._
@@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest {
.thenReturn(Some(leaderEpoch))
.thenReturn(None) // t2p1 doesn't support epochs
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(0, leaderEpoch)))
+ Some(new OffsetAndEpoch(0, leaderEpoch)))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@@ -303,7 +303,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(0)
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(0, leaderEpoch)))
+ Some(new OffsetAndEpoch(0, leaderEpoch)))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@@ -367,7 +367,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(initialLEO - 1)
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(initialLEO, leaderEpoch)))
+ Some(new OffsetAndEpoch(initialLEO, leaderEpoch)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -488,9 +488,9 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(initialLEO - 2)
when(log.latestEpoch).thenReturn(Some(5))
when(log.endOffsetForEpoch(4)).thenReturn(
- Some(OffsetAndEpoch(120, 3)))
+ Some(new OffsetAndEpoch(120, 3)))
when(log.endOffsetForEpoch(3)).thenReturn(
- Some(OffsetAndEpoch(120, 3)))
+ Some(new OffsetAndEpoch(120, 3)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -571,9 +571,9 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log)
when(log.highWatermark).thenReturn(115)
when(log.latestEpoch).thenAnswer(_ => latestLogEpoch)
- when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
- when(log.endOffsetForEpoch(3)).thenReturn(Some(OffsetAndEpoch(129, 2)))
- when(log.endOffsetForEpoch(2)).thenReturn(Some(OffsetAndEpoch(119, 1)))
+ when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4)))
+ when(log.endOffsetForEpoch(3)).thenReturn(Some(new OffsetAndEpoch(129, 2)))
+ when(log.endOffsetForEpoch(2)).thenReturn(Some(new OffsetAndEpoch(119, 1)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -674,7 +674,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(highWatermark)
when(log.latestEpoch).thenReturn(Some(5))
- when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
+ when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4)))
when(log.logEndOffset).thenReturn(logEndOffset)
when(replicaManager.metadataCache).thenReturn(metadataCache)
@@ -766,9 +766,9 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(initialLEO - 2)
when(log.latestEpoch).thenReturn(Some(5))
when(log.endOffsetForEpoch(4)).thenReturn(
- Some(OffsetAndEpoch(120, 3)))
+ Some(new OffsetAndEpoch(120, 3)))
when(log.endOffsetForEpoch(3)).thenReturn(
- Some(OffsetAndEpoch(120, 3)))
+ Some(new OffsetAndEpoch(120, 3)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -893,7 +893,7 @@ class ReplicaFetcherThreadTest {
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
// this is for the last reply with EpochEndOffset(5, 156)
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(initialLeo, leaderEpoch)))
+ Some(new OffsetAndEpoch(initialLeo, leaderEpoch)))
when(log.logEndOffset).thenReturn(initialLeo)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@@ -958,7 +958,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(0)
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
- Some(OffsetAndEpoch(0, leaderEpoch)))
+ Some(new OffsetAndEpoch(0, leaderEpoch)))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@@ -1016,7 +1016,7 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log)
when(log.highWatermark).thenReturn(initialLEO - 2)
when(log.latestEpoch).thenReturn(Some(5))
- when(log.endOffsetForEpoch(5)).thenReturn(Some(OffsetAndEpoch(initialLEO, 5)))
+ when(log.endOffsetForEpoch(5)).thenReturn(Some(new OffsetAndEpoch(initialLEO, 5)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5602acc7985..f8d3ae3d490 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -57,6 +57,7 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPar
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig}
@@ -2059,7 +2060,7 @@ class ReplicaManagerTest {
override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
assertEquals(leaderEpoch, leaderEpochFromLeader)
localLogOffset.map { logOffset =>
- Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader))
+ Some(new OffsetAndEpoch(logOffset, leaderEpochFromLeader))
}.getOrElse(super.endOffsetForEpoch(leaderEpoch))
}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index e5460d09d9c..ad5a7cee637 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -53,7 +54,7 @@ class OffsetsForLeaderEpochTest {
@Test
def shouldGetEpochsFromReplica(): Unit = {
//Given
- val offsetAndEpoch = OffsetAndEpoch(42L, 5)
+ val offsetAndEpoch = new OffsetAndEpoch(42L, 5)
val epochRequested: Integer = 5
val request = Seq(newOffsetForLeaderTopic(tp, RecordBatch.NO_PARTITION_LEADER_EPOCH, epochRequested))
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 04008b1d0c5..e37a197655c 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -30,7 +30,6 @@ import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
-import kafka.server.OffsetAndEpoch;
import kafka.server.OffsetTruncationState;
import kafka.server.QuotaFactory;
import kafka.server.RemoteLeaderEndPoint;
@@ -67,6 +66,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
@@ -99,7 +99,6 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import scala.Option;
-import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
@@ -318,8 +317,8 @@ public class ReplicaFetcherThreadBenchmark {
config::interBrokerProtocolVersion
) {
@Override
- public Tuple2<Object, Object> fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
- return Tuple2.apply(0, 0);
+ public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
+ return new OffsetAndEpoch(0L, 0);
}
@Override
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java
new file mode 100644
index 00000000000..a5953ae70bc
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+public class OffsetAndEpoch {
+ private final long offset;
+ private final int leaderEpoch;
+
+ public OffsetAndEpoch(long offset, int leaderEpoch) {
+ this.offset = offset;
+ this.leaderEpoch = leaderEpoch;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public int leaderEpoch() {
+ return leaderEpoch;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OffsetAndEpoch that = (OffsetAndEpoch) o;
+ return offset == that.offset && leaderEpoch == that.leaderEpoch;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = leaderEpoch;
+ result = 31 * result + Long.hashCode(offset);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "(offset=" + offset + ", leaderEpoch=" + leaderEpoch + ")";
+ }
+}