You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/21 16:11:42 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

dajac commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1028216090


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&
+              replicaState.logStartOffset <= fetchOffset &&
+              partition.inSyncReplicaIds.contains(replica.brokerId)

Review Comment:
   nit: Could we align these two lines with `replicaState` on the previous line?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&
+              replicaState.logStartOffset <= fetchOffset &&
+              partition.inSyncReplicaIds.contains(replica.brokerId)
+            ) {

Review Comment:
   nit: We usually put the closing parenthesis of `if` statement on the previous line.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response)
+  }
+
+  private def validateFetchResponse(response: FetchResponse, preferredReadReplica: Int = -1): Unit = {

Review Comment:
   nit: `validatePreferredReadReplica`?



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()

Review Comment:
   Is it guaranteed that the follower is removed from the ISR after this line or do we have a race condition with the below checks? 



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4174,11 +4244,14 @@ class ReplicaManagerTest {
 class MockReplicaSelector extends ReplicaSelector {
 
   private val selectionCount = new AtomicLong()
+  private var partitionViewArgument: Option[PartitionView] = None
 
   def getSelectionCount: Long = selectionCount.get
+  def getPartitionViewArgument(): Option[PartitionView] = partitionViewArgument

Review Comment:
   nit: We usually don't prefix getters with `get`. Could we rename these?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
+      val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> leaderNode,
+        followerBrokerId -> followerNode
+      ).toMap)
+
+      // Make this replica the leader and remove follower from ISR.
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(leaderBrokerId)
+          .setLeaderEpoch(1)
+          .setIsr(Seq[Integer](leaderBrokerId).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(leaderNode, followerNode).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ())
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+      val consumerResult = fetchPartitionAsConsumer(
+        replicaManager,
+        tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata)
+      )
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.hasFired)
+
+      // PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR
+      val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 0, 0))
+      val partitionView = replicaManager.replicaSelectorOpt.get
+        .asInstanceOf[MockReplicaSelector].getPartitionViewArgument()
+
+      assertTrue(partitionView.isDefined)
+      assertEquals(expectedReplicaViews.asJava, partitionView.get.replicas())

Review Comment:
   nit: You can remove `()` after `replicas`.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&

Review Comment:
   Note that this is implicit if the replica is in the ISR but it does not cost much. Should we put the ISR check first? It seems to me that this is the most important one.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.

Review Comment:
   nit: Should we explain the impact if a follower lagging indefinitely on consumers fetching from it?



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response)

Review Comment:
   nit: I would rather put `preferredReadReplica = -1` here. It makes the test more readable.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
+      val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> leaderNode,
+        followerBrokerId -> followerNode
+      ).toMap)
+
+      // Make this replica the leader and remove follower from ISR.
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(leaderBrokerId)
+          .setLeaderEpoch(1)
+          .setIsr(Seq[Integer](leaderBrokerId).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(leaderNode, followerNode).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ())
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+      val consumerResult = fetchPartitionAsConsumer(
+        replicaManager,
+        tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata)
+      )
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.hasFired)

Review Comment:
   Should we assert the returned preferred replica?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org