You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/03/29 10:52:28 UTC

[kafka] branch 3.2 updated: KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new f3eab7b  KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)
f3eab7b is described below

commit f3eab7b827b96fc69b4679b0e11656e00af776b7
Author: bozhao12 <10...@users.noreply.github.com>
AuthorDate: Tue Mar 29 16:13:05 2022 +0800

    KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)
    
    When a replica selector is configured, the partition leader computes a preferred read replica for any fetch from the consumers. When the preferred read replica is not the leader, the leader returns the preferred read replica with `FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)` to the `ReplicaManager`. This causes the fetch to go into in the fetch purgatory because the exit conditions are not met. In turns out that the delayed fetch is not completed until  [...]
    
    This patch fixes the issue by completing the fetch request immediately when a preferred read replica is defined.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  8 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 76 ++++++++++++++++++++--
 2 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 42124aa..4b77a4a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1028,15 +1028,17 @@ class ReplicaManager(val config: KafkaConfig,
     var bytesReadable: Long = 0
     var errorReadingData = false
     var hasDivergingEpoch = false
+    var hasPreferredReadReplica = false
     val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
     logReadResults.foreach { case (topicIdPartition, logReadResult) =>
       brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
       brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
-
       if (logReadResult.error != Errors.NONE)
         errorReadingData = true
       if (logReadResult.divergingEpoch.nonEmpty)
         hasDivergingEpoch = true
+      if (logReadResult.preferredReadReplica.nonEmpty)
+        hasPreferredReadReplica = true
       bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
       logReadResultMap.put(topicIdPartition, logReadResult)
     }
@@ -1046,7 +1048,9 @@ class ReplicaManager(val config: KafkaConfig,
     //                        3) has enough data to respond
     //                        4) some error happens while reading data
     //                        5) we found a diverging epoch
-    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
+    //                        6) has a preferred read replica
+    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData ||
+      hasDivergingEpoch || hasPreferredReadReplica) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
         val isReassignmentFetch = isFromFollower && isAddingReplica(tp.topicPartition, replicaId)
         tp -> result.toFetchPartitionData(isReassignmentFetch)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index dd644c8..a17c70b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1265,7 +1265,7 @@ class ReplicaManagerTest {
 
       initializeLogAndTopicId(replicaManager, tp0, topicId)
 
-      // Make this replica the follower
+      // Make this replica the leader
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
         Seq(new LeaderAndIsrPartitionState()
           .setTopicName(topic)
@@ -1281,14 +1281,14 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
-      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
+      val metadata = new DefaultClientMetadata("rack-a", "client-id",
         InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
 
       val consumerResult = fetchAsConsumer(replicaManager, tidp0,
         new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
         clientMetadata = Some(metadata))
 
-      // Fetch from follower succeeds
+      // Fetch from leader succeeds
       assertTrue(consumerResult.isFired)
 
       // Returns a preferred replica (should just be the leader, which is None)
@@ -1301,6 +1301,66 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector"))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
+        followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")
+      ).toMap)
+
+      // Make this replica the leader
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
+      // Avoid the replica selector ignore the follower replica if it not have the data that need to fetch
+      replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(followerBrokerId, new LogOffsetMetadata(0), 0, 0, 0)
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default")
+
+      // If a preferred read replica is selected, the fetch response returns immediately, even if min bytes and timeout conditions are not met.
+      val consumerResult = fetchAsConsumer(replicaManager, tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        minBytes = 1, clientMetadata = Some(metadata), timeout = 5000)
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.isFired)
+
+      // No delayed fetch was inserted
+      assertEquals(0, replicaManager.delayedFetchPurgatory.watched)
+
+      // Returns a preferred replica
+      assertTrue(consumerResult.assertFired.preferredReadReplica.isDefined)
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
   def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = {
     val topicPartition = 0
     val followerBrokerId = 0
@@ -2023,8 +2083,9 @@ class ReplicaManagerTest {
                               partitionData: PartitionData,
                               minBytes: Int = 0,
                               isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
-                              clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
-    fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
+                              clientMetadata: Option[ClientMetadata] = None,
+                              timeout: Long = 1000): CallbackResult[FetchPartitionData] = {
+    fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata, timeout)
   }
 
   private def fetchAsFollower(replicaManager: ReplicaManager,
@@ -2042,7 +2103,8 @@ class ReplicaManagerTest {
                             partitionData: PartitionData,
                             minBytes: Int,
                             isolationLevel: IsolationLevel,
-                            clientMetadata: Option[ClientMetadata]): CallbackResult[FetchPartitionData] = {
+                            clientMetadata: Option[ClientMetadata],
+                            timeout: Long = 1000): CallbackResult[FetchPartitionData] = {
     val result = new CallbackResult[FetchPartitionData]()
     def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
       assertEquals(1, responseStatus.size)
@@ -2052,7 +2114,7 @@ class ReplicaManagerTest {
     }
 
     replicaManager.fetchMessages(
-      timeout = 1000,
+      timeout = timeout,
       replicaId = replicaId,
       fetchMinBytes = minBytes,
       fetchMaxBytes = Int.MaxValue,