You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/25 20:24:55 UTC

[kafka] branch trunk updated: KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a450fb70c1 KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411)
a450fb70c1 is described below

commit a450fb70c12ba66257e8f61cc4903290f1e435ea
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jul 25 13:24:40 2022 -0700

    KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411)
    
    After the fix for https://github.com/apache/kafka/pull/12150, if a follower receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. We need to do epoch leader/epoch validation first before we check whether we have a valid replica.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 36 +++++++++++++------
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 41 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 319025226c..538c51f903 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1205,22 +1205,32 @@ class Partition(val topicPartition: TopicPartition,
     minOneMessage: Boolean,
     updateFetchState: Boolean
   ): LogReadInfo = {
-    def readFromLocalLog(): LogReadInfo = {
+    def readFromLocalLog(log: UnifiedLog): LogReadInfo = {
       readRecords(
+        log,
         fetchPartitionData.lastFetchedEpoch,
         fetchPartitionData.fetchOffset,
         fetchPartitionData.currentLeaderEpoch,
         maxBytes,
         fetchParams.isolation,
-        minOneMessage,
-        fetchParams.fetchOnlyLeader
+        minOneMessage
       )
     }
 
     if (fetchParams.isFromFollower) {
       // Check that the request is from a valid replica before doing the read
-      val replica = followerReplicaOrThrow(fetchParams.replicaId, fetchPartitionData)
-      val logReadInfo = readFromLocalLog()
+      val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
+        val localLog = localLogWithEpochOrThrow(
+          fetchPartitionData.currentLeaderEpoch,
+          fetchParams.fetchOnlyLeader
+        )
+        val replica = followerReplicaOrThrow(
+          fetchParams.replicaId,
+          fetchPartitionData
+        )
+        val logReadInfo = readFromLocalLog(localLog)
+        (replica, logReadInfo)
+      }
 
       if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) {
         updateFollowerFetchState(
@@ -1234,7 +1244,13 @@ class Partition(val topicPartition: TopicPartition,
 
       logReadInfo
     } else {
-      readFromLocalLog()
+      inReadLock(leaderIsrUpdateLock) {
+        val localLog = localLogWithEpochOrThrow(
+          fetchPartitionData.currentLeaderEpoch,
+          fetchParams.fetchOnlyLeader
+        )
+        readFromLocalLog(localLog)
+      }
     }
   }
 
@@ -1270,16 +1286,14 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def readRecords(
+    localLog: UnifiedLog,
     lastFetchedEpoch: Optional[Integer],
     fetchOffset: Long,
     currentLeaderEpoch: Optional[Integer],
     maxBytes: Int,
     fetchIsolation: FetchIsolation,
-    minOneMessage: Boolean,
-    fetchOnlyFromLeader: Boolean
-  ): LogReadInfo = inReadLock(leaderIsrUpdateLock) {
-    val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)
-
+    minOneMessage: Boolean
+  ): LogReadInfo = {
     // Note we use the log end offset prior to the read. This ensures that any appends following
     // the fetch do not prevent a follower from coming into sync.
     val initialHighWatermark = localLog.highWatermark
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 65a6cdadf4..5038219579 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -238,6 +238,47 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(None, partition.futureLog)
   }
 
+  @Test
+  def testReplicaFetchToFollower(): Unit = {
+    val controllerEpoch = 3
+    val followerId = brokerId + 1
+    val leaderId = brokerId + 2
+    val replicas = List[Integer](brokerId, followerId, leaderId).asJava
+    val isr = List[Integer](brokerId, followerId, leaderId).asJava
+    val leaderEpoch = 8
+    val partitionEpoch = 1
+
+    assertTrue(partition.makeFollower(new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(isr)
+      .setPartitionEpoch(partitionEpoch)
+      .setReplicas(replicas)
+      .setIsNew(true),
+      offsetCheckpoints, None
+    ))
+
+    def assertFetchFromReplicaFails[T <: ApiException](
+      expectedExceptionClass: Class[T],
+      leaderEpoch: Option[Int]
+    ): Unit = {
+      assertThrows(expectedExceptionClass, () => {
+        fetchFollower(
+          partition,
+          replicaId = followerId,
+          fetchOffset = 0L,
+          leaderEpoch = leaderEpoch
+        )
+      })
+    }
+
+    assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], None)
+    assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], Some(leaderEpoch))
+    assertFetchFromReplicaFails(classOf[UnknownLeaderEpochException], Some(leaderEpoch + 1))
+    assertFetchFromReplicaFails(classOf[FencedLeaderEpochException], Some(leaderEpoch - 1))
+  }
+
   @Test
   def testFetchFromUnrecognizedFollower(): Unit = {
     val controllerEpoch = 3