You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/03/29 10:52:28 UTC
[kafka] branch 3.2 updated: KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new f3eab7b KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)
f3eab7b is described below
commit f3eab7b827b96fc69b4679b0e11656e00af776b7
Author: bozhao12 <10...@users.noreply.github.com>
AuthorDate: Tue Mar 29 16:13:05 2022 +0800
KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)
When a replica selector is configured, the partition leader computes a preferred read replica for any fetch from the consumers. When the preferred read replica is not the leader, the leader returns the preferred read replica with `FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)` to the `ReplicaManager`. This causes the fetch to go into in the fetch purgatory because the exit conditions are not met. In turns out that the delayed fetch is not completed until [...]
This patch fixes the issue by completing the fetch request immediately when a preferred read replica is defined.
Reviewers: David Jacot <dj...@confluent.io>
---
.../main/scala/kafka/server/ReplicaManager.scala | 8 ++-
.../unit/kafka/server/ReplicaManagerTest.scala | 76 ++++++++++++++++++++--
2 files changed, 75 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 42124aa..4b77a4a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1028,15 +1028,17 @@ class ReplicaManager(val config: KafkaConfig,
var bytesReadable: Long = 0
var errorReadingData = false
var hasDivergingEpoch = false
+ var hasPreferredReadReplica = false
val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
logReadResults.foreach { case (topicIdPartition, logReadResult) =>
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
-
if (logReadResult.error != Errors.NONE)
errorReadingData = true
if (logReadResult.divergingEpoch.nonEmpty)
hasDivergingEpoch = true
+ if (logReadResult.preferredReadReplica.nonEmpty)
+ hasPreferredReadReplica = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
logReadResultMap.put(topicIdPartition, logReadResult)
}
@@ -1046,7 +1048,9 @@ class ReplicaManager(val config: KafkaConfig,
// 3) has enough data to respond
// 4) some error happens while reading data
// 5) we found a diverging epoch
- if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
+ // 6) has a preferred read replica
+ if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData ||
+ hasDivergingEpoch || hasPreferredReadReplica) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = isFromFollower && isAddingReplica(tp.topicPartition, replicaId)
tp -> result.toFetchPartitionData(isReassignmentFetch)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index dd644c8..a17c70b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1265,7 +1265,7 @@ class ReplicaManagerTest {
initializeLogAndTopicId(replicaManager, tp0, topicId)
- // Make this replica the follower
+ // Make this replica the leader
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
@@ -1281,14 +1281,14 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
- val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
+ val metadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
val consumerResult = fetchAsConsumer(replicaManager, tidp0,
new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
clientMetadata = Some(metadata))
- // Fetch from follower succeeds
+ // Fetch from leader succeeds
assertTrue(consumerResult.isFired)
// Returns a preferred replica (should just be the leader, which is None)
@@ -1301,6 +1301,66 @@ class ReplicaManagerTest {
}
@Test
+ def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): Unit = {
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+ propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector"))
+
+ try {
+ val leaderBrokerId = 0
+ val followerBrokerId = 1
+ val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+ val topicId = Uuid.randomUuid()
+ val tp0 = new TopicPartition(topic, 0)
+ val tidp0 = new TopicIdPartition(topicId, tp0)
+
+ initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+ when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+ tp0,
+ new ListenerName("default")
+ )).thenReturn(Map(
+ leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
+ followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")
+ ).toMap)
+
+ // Make this replica the leader
+ val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(1)
+ .setIsr(brokerList)
+ .setZkVersion(0)
+ .setReplicas(brokerList)
+ .setIsNew(false)).asJava,
+ Collections.singletonMap(topic, topicId),
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+ replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
+ // Avoid the replica selector ignore the follower replica if it not have the data that need to fetch
+ replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(followerBrokerId, new LogOffsetMetadata(0), 0, 0, 0)
+
+ val metadata = new DefaultClientMetadata("rack-b", "client-id",
+ InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default")
+
+ // If a preferred read replica is selected, the fetch response returns immediately, even if min bytes and timeout conditions are not met.
+ val consumerResult = fetchAsConsumer(replicaManager, tidp0,
+ new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+ minBytes = 1, clientMetadata = Some(metadata), timeout = 5000)
+
+ // Fetch from leader succeeds
+ assertTrue(consumerResult.isFired)
+
+ // No delayed fetch was inserted
+ assertEquals(0, replicaManager.delayedFetchPurgatory.watched)
+
+ // Returns a preferred replica
+ assertTrue(consumerResult.assertFired.preferredReadReplica.isDefined)
+ } finally replicaManager.shutdown(checkpointHW = false)
+ }
+
+ @Test
def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
@@ -2023,8 +2083,9 @@ class ReplicaManagerTest {
partitionData: PartitionData,
minBytes: Int = 0,
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
- clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
- fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
+ clientMetadata: Option[ClientMetadata] = None,
+ timeout: Long = 1000): CallbackResult[FetchPartitionData] = {
+ fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata, timeout)
}
private def fetchAsFollower(replicaManager: ReplicaManager,
@@ -2042,7 +2103,8 @@ class ReplicaManagerTest {
partitionData: PartitionData,
minBytes: Int,
isolationLevel: IsolationLevel,
- clientMetadata: Option[ClientMetadata]): CallbackResult[FetchPartitionData] = {
+ clientMetadata: Option[ClientMetadata],
+ timeout: Long = 1000): CallbackResult[FetchPartitionData] = {
val result = new CallbackResult[FetchPartitionData]()
def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
assertEquals(1, responseStatus.size)
@@ -2052,7 +2114,7 @@ class ReplicaManagerTest {
}
replicaManager.fetchMessages(
- timeout = 1000,
+ timeout = timeout,
replicaId = replicaId,
fetchMinBytes = minBytes,
fetchMaxBytes = Int.MaxValue,