You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by GitBox <gi...@apache.org> on 2022/04/04 07:46:23 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

dajac commented on code in PR #11965:
URL: https://github.com/apache/kafka/pull/11965#discussion_r841436792


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -256,6 +256,7 @@ class Partition(val topicPartition: TopicPartition,
   // start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition),
   // defined when this broker is leader for partition
   @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
+  // Point to itself if is leader, otherwise point to the latest leader replica

Review Comment:
   nit: I would phrase it as follow: `Replica ID of the leader, defined when this broker is leader or follower for the partition.`.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1234,8 +1234,9 @@ class ReplicaManager(val config: KafkaConfig,
                                fetchOffset: Long,
                                currentTimeMs: Long): Option[Int] = {
     partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
-      // Don't look up preferred for follower fetches via normal replication
-      if (Request.isValidBrokerId(replicaId))
+      // Don't look up preferred for follower fetches via normal replication and
+      // don't look up preferred read replica while fetch from follower replica
+      if (Request.isValidBrokerId(replicaId) || !partition.isLeader)

Review Comment:
   I wonder if we should add the following method to `Partition` and use it instead of using `leaderReplicaIdOpt`. 
   
   ```scala
   def leaderIdIfLocal: Option[Int] = {
     leaderReplicaIdOpt.filter(_ == localBrokerId)
   }
   ```
   
   We don't use any lock on this path so solely relying on `leaderReplicaIdOpt` to take the decision seems slightly better than calling `partition.isLeader`. What do you think?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3719,3 +3767,19 @@ class ReplicaManagerTest {
     }
   }
 }
+
+class MockReplicaSelector extends ReplicaSelector {
+
+  private val triggerSelectionCount = new AtomicLong()
+
+  def getTriggerSelectionCount: Long = triggerSelectionCount.get
+
+  /**
+   * Select the preferred replica a client should use for fetching. If no replica is available, this will return an
+   * empty optional.
+   */

Review Comment:
   nit: I think that we can remove this default comment.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1300,6 +1300,54 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "kafka.server.MockReplicaSelector"))

Review Comment:
   nit: We could use `classOf[MockReplicaSelector].getName`, I think.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3719,3 +3767,19 @@ class ReplicaManagerTest {
     }
   }
 }
+
+class MockReplicaSelector extends ReplicaSelector {
+
+  private val triggerSelectionCount = new AtomicLong()
+
+  def getTriggerSelectionCount: Long = triggerSelectionCount.get

Review Comment:
   nit: The `trigger` in the name bugs me. We could perhaps just use `selectionCount`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org