You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/04/05 16:56:32 UTC

[kafka] branch trunk updated: KAFKA-13778: Fetch from follower should never run the preferred read replica selection (#11965)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4218fc61fe KAFKA-13778: Fetch from follower should never run the preferred read replica selection (#11965)
4218fc61fe is described below

commit 4218fc61fedb02b78d35c88e56ab253baaf09f39
Author: bozhao12 <10...@users.noreply.github.com>
AuthorDate: Wed Apr 6 00:56:23 2022 +0800

    KAFKA-13778: Fetch from follower should never run the preferred read replica selection (#11965)
    
    The current preferred read replica selection logic relies on `partition.leaderReplicaIdOpt` to determine if the selection must be run. The issue is that `partition.leaderReplicaIdOpt` is defined for both the leader and the followers thus the logic is ran all the time. The impact is not too bad as the leader is selected most of the time when the logic is ran by the follower and the leader is filtered out. However there are cases where the selection on a follower could redirect the cons [...]
    
    This patch ensures that the preferred read replica selection is only ran by the leader.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  5 ++
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 64 +++++++++++++++++++++-
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e692db83ea..8a6df9ff7f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -256,6 +256,7 @@ class Partition(val topicPartition: TopicPartition,
   // start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition),
   // defined when this broker is leader for partition
   @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
+  // Replica ID of the leader, defined when this broker is leader or follower for the partition.
   @volatile var leaderReplicaIdOpt: Option[Int] = None
   @volatile private[cluster] var partitionState: PartitionState = CommittedPartitionState(Set.empty, LeaderRecoveryState.RECOVERED)
   @volatile var assignmentState: AssignmentState = SimpleAssignmentState(Seq.empty)
@@ -433,6 +434,10 @@ class Partition(val topicPartition: TopicPartition,
    */
   def isLeader: Boolean = leaderReplicaIdOpt.contains(localBrokerId)
 
+  def leaderIdIfLocal: Option[Int] = {
+    leaderReplicaIdOpt.filter(_ == localBrokerId)
+  }
+
   private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer],
                                            requireLeader: Boolean): UnifiedLog = {
     getLocalLog(currentLeaderEpoch, requireLeader) match {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4b77a4a56c..eff4824c47 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1233,7 +1233,7 @@ class ReplicaManager(val config: KafkaConfig,
                                replicaId: Int,
                                fetchOffset: Long,
                                currentTimeMs: Long): Option[Int] = {
-    partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
+    partition.leaderIdIfLocal.flatMap { leaderReplicaId =>
       // Don't look up preferred for follower fetches via normal replication
       if (Request.isValidBrokerId(replicaId))
         None
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a17c70b721..3656a91af5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.net.InetAddress
 import java.nio.file.Files
 import java.util
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.stream.IntStream
 import java.util.{Collections, Optional, Properties}
@@ -44,7 +44,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.replica.ClientMetadata
+import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, ReplicaSelector, ReplicaView}
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -1300,6 +1300,54 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(1)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
+
+      val metadata = new DefaultClientMetadata("rack-a", "client-id",
+        InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default")
+
+      val consumerResult = fetchAsConsumer(replicaManager, tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000,
+          Optional.empty()), clientMetadata = Some(metadata))
+
+      // Fetch from follower succeeds
+      assertTrue(consumerResult.isFired)
+
+      // Expect not run the preferred read replica selection
+      assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount)
+
+      // Only leader will compute preferred replica
+      assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
+
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
   @Test
   def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
@@ -3719,3 +3767,15 @@ class ReplicaManagerTest {
     }
   }
 }
+
+class MockReplicaSelector extends ReplicaSelector {
+
+  private val selectionCount = new AtomicLong()
+
+  def getSelectionCount: Long = selectionCount.get
+
+  override def select(topicPartition: TopicPartition, clientMetadata: ClientMetadata, partitionView: PartitionView): Optional[ReplicaView] = {
+    selectionCount.incrementAndGet()
+    Optional.of(partitionView.leader)
+  }
+}