You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/25 20:29:32 UTC
[kafka] branch 3.3 updated: KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new ebd63b54bd KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411)
ebd63b54bd is described below
commit ebd63b54bddae886f8125d904e1676333e8f4e58
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jul 25 13:24:40 2022 -0700
KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411)
After the fix for https://github.com/apache/kafka/pull/12150, if a follower receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. We need to do epoch leader/epoch validation first before we check whether we have a valid replica.
Reviewers: David Jacot <dj...@confluent.io>
---
core/src/main/scala/kafka/cluster/Partition.scala | 36 +++++++++++++------
.../scala/unit/kafka/cluster/PartitionTest.scala | 41 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 319025226c..538c51f903 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1205,22 +1205,32 @@ class Partition(val topicPartition: TopicPartition,
minOneMessage: Boolean,
updateFetchState: Boolean
): LogReadInfo = {
- def readFromLocalLog(): LogReadInfo = {
+ def readFromLocalLog(log: UnifiedLog): LogReadInfo = {
readRecords(
+ log,
fetchPartitionData.lastFetchedEpoch,
fetchPartitionData.fetchOffset,
fetchPartitionData.currentLeaderEpoch,
maxBytes,
fetchParams.isolation,
- minOneMessage,
- fetchParams.fetchOnlyLeader
+ minOneMessage
)
}
if (fetchParams.isFromFollower) {
// Check that the request is from a valid replica before doing the read
- val replica = followerReplicaOrThrow(fetchParams.replicaId, fetchPartitionData)
- val logReadInfo = readFromLocalLog()
+ val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
+ val localLog = localLogWithEpochOrThrow(
+ fetchPartitionData.currentLeaderEpoch,
+ fetchParams.fetchOnlyLeader
+ )
+ val replica = followerReplicaOrThrow(
+ fetchParams.replicaId,
+ fetchPartitionData
+ )
+ val logReadInfo = readFromLocalLog(localLog)
+ (replica, logReadInfo)
+ }
if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) {
updateFollowerFetchState(
@@ -1234,7 +1244,13 @@ class Partition(val topicPartition: TopicPartition,
logReadInfo
} else {
- readFromLocalLog()
+ inReadLock(leaderIsrUpdateLock) {
+ val localLog = localLogWithEpochOrThrow(
+ fetchPartitionData.currentLeaderEpoch,
+ fetchParams.fetchOnlyLeader
+ )
+ readFromLocalLog(localLog)
+ }
}
}
@@ -1270,16 +1286,14 @@ class Partition(val topicPartition: TopicPartition,
}
private def readRecords(
+ localLog: UnifiedLog,
lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
- minOneMessage: Boolean,
- fetchOnlyFromLeader: Boolean
- ): LogReadInfo = inReadLock(leaderIsrUpdateLock) {
- val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)
-
+ minOneMessage: Boolean
+ ): LogReadInfo = {
// Note we use the log end offset prior to the read. This ensures that any appends following
// the fetch do not prevent a follower from coming into sync.
val initialHighWatermark = localLog.highWatermark
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 65a6cdadf4..5038219579 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -238,6 +238,47 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(None, partition.futureLog)
}
+ @Test
+ def testReplicaFetchToFollower(): Unit = {
+ val controllerEpoch = 3
+ val followerId = brokerId + 1
+ val leaderId = brokerId + 2
+ val replicas = List[Integer](brokerId, followerId, leaderId).asJava
+ val isr = List[Integer](brokerId, followerId, leaderId).asJava
+ val leaderEpoch = 8
+ val partitionEpoch = 1
+
+ assertTrue(partition.makeFollower(new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(isr)
+ .setPartitionEpoch(partitionEpoch)
+ .setReplicas(replicas)
+ .setIsNew(true),
+ offsetCheckpoints, None
+ ))
+
+ def assertFetchFromReplicaFails[T <: ApiException](
+ expectedExceptionClass: Class[T],
+ leaderEpoch: Option[Int]
+ ): Unit = {
+ assertThrows(expectedExceptionClass, () => {
+ fetchFollower(
+ partition,
+ replicaId = followerId,
+ fetchOffset = 0L,
+ leaderEpoch = leaderEpoch
+ )
+ })
+ }
+
+ assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], None)
+ assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], Some(leaderEpoch))
+ assertFetchFromReplicaFails(classOf[UnknownLeaderEpochException], Some(leaderEpoch + 1))
+ assertFetchFromReplicaFails(classOf[FencedLeaderEpochException], Some(leaderEpoch - 1))
+ }
+
@Test
def testFetchFromUnrecognizedFollower(): Unit = {
val controllerEpoch = 3