You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/19 02:35:31 UTC

[GitHub] [kafka] jeffkbkim opened a new pull request, #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

jeffkbkim opened a new pull request, #12877:
URL: https://github.com/apache/kafka/pull/12877

   The default replica selector chooses a replica on whether the broker.rack matches the client.rack in the fetch request and whether the offset exists in the follower. If the follower is not in the ISR, we know it's lagging behind which will also lag the consumer behind. there are two cases:
   1. the follower recovers and joins the isr. the consumer will no longer fall behind
   2. the follower continues to lag behind. after 5 minutes, the consumer will refresh its preferred read replica and the leader will return the same lagging follower since the offset the consumer fetched up to is capped by the follower's HWM. this can go on indefinitely
   
   If the replica selector chooses a broker in the ISR then we can ensure that at least every 5 minutes the consumer will consume from an up-to-date replica. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1031546311


##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()

Review Comment:
   ah ok. i did not see this because i thought that the purpose of the patch was to verify the ISR behavior. i guess that this extra test does not hurt so we can keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1029900199


##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()

Review Comment:
   added a check to confirm that the follower is no longer reachable.
   
   forgot to comment but the test validates that the leader chooses a broker from the live set of brokers. this passes with the previous implementation as well. 
   
   this doesn't test follower falling out of ISR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1028216090


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&
+              replicaState.logStartOffset <= fetchOffset &&
+              partition.inSyncReplicaIds.contains(replica.brokerId)

Review Comment:
   nit: Could we align these two lines with `replicaState` on the previous line?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&
+              replicaState.logStartOffset <= fetchOffset &&
+              partition.inSyncReplicaIds.contains(replica.brokerId)
+            ) {

Review Comment:
   nit: We usually put the closing parenthesis of `if` statement on the previous line.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response)
+  }
+
+  private def validateFetchResponse(response: FetchResponse, preferredReadReplica: Int = -1): Unit = {

Review Comment:
   nit: `validatePreferredReadReplica`?



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()

Review Comment:
   Is it guaranteed that the follower is removed from the ISR after this line or do we have a race condition with the below checks? 



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4174,11 +4244,14 @@ class ReplicaManagerTest {
 class MockReplicaSelector extends ReplicaSelector {
 
   private val selectionCount = new AtomicLong()
+  private var partitionViewArgument: Option[PartitionView] = None
 
   def getSelectionCount: Long = selectionCount.get
+  def getPartitionViewArgument(): Option[PartitionView] = partitionViewArgument

Review Comment:
   nit: We usually don't prefix getters with `get`. Could we rename these?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
+      val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> leaderNode,
+        followerBrokerId -> followerNode
+      ).toMap)
+
+      // Make this replica the leader and remove follower from ISR.
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(leaderBrokerId)
+          .setLeaderEpoch(1)
+          .setIsr(Seq[Integer](leaderBrokerId).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(leaderNode, followerNode).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ())
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+      val consumerResult = fetchPartitionAsConsumer(
+        replicaManager,
+        tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata)
+      )
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.hasFired)
+
+      // PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR
+      val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 0, 0))
+      val partitionView = replicaManager.replicaSelectorOpt.get
+        .asInstanceOf[MockReplicaSelector].getPartitionViewArgument()
+
+      assertTrue(partitionView.isDefined)
+      assertEquals(expectedReplicaViews.asJava, partitionView.get.replicas())

Review Comment:
   nit: You can remove `()` after `replicas`.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&

Review Comment:
   Note that this is implicit if the replica is in the ISR but it does not cost much. Should we put the ISR check first? It seems to me that this is the most important one.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.

Review Comment:
   nit: Should we explain the impact if a follower lagging indefinitely on consumers fetching from it?



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response)

Review Comment:
   nit: I would rather put `preferredReadReplica = -1` here. It makes the test more readable.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
+      val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> leaderNode,
+        followerBrokerId -> followerNode
+      ).toMap)
+
+      // Make this replica the leader and remove follower from ISR.
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(leaderBrokerId)
+          .setLeaderEpoch(1)
+          .setIsr(Seq[Integer](leaderBrokerId).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(leaderNode, followerNode).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ())
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+      val consumerResult = fetchPartitionAsConsumer(
+        replicaManager,
+        tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata)
+      )
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.hasFired)

Review Comment:
   Should we assert the returned preferred replica?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12877:
URL: https://github.com/apache/kafka/pull/12877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1029884348


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
+      val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> leaderNode,
+        followerBrokerId -> followerNode
+      ).toMap)
+
+      // Make this replica the leader and remove follower from ISR.
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(leaderBrokerId)
+          .setLeaderEpoch(1)
+          .setIsr(Seq[Integer](leaderBrokerId).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(leaderNode, followerNode).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ())
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+      val consumerResult = fetchPartitionAsConsumer(
+        replicaManager,
+        tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata)
+      )
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.hasFired)

Review Comment:
   this test uses the MockReplicaSelector which always returns the leader. hence why the test validates the arguments passed to the selector



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1029885511


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4174,11 +4244,14 @@ class ReplicaManagerTest {
 class MockReplicaSelector extends ReplicaSelector {
 
   private val selectionCount = new AtomicLong()
+  private var partitionViewArgument: Option[PartitionView] = None
 
   def getSelectionCount: Long = selectionCount.get
+  def getPartitionViewArgument(): Option[PartitionView] = partitionViewArgument

Review Comment:
   i followed the convention used with `getSelectionCount` due to
   ```
   partitionViewArgument is already defined in the scope
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1029885511


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4174,11 +4244,14 @@ class ReplicaManagerTest {
 class MockReplicaSelector extends ReplicaSelector {
 
   private val selectionCount = new AtomicLong()
+  private var partitionViewArgument: Option[PartitionView] = None
 
   def getSelectionCount: Long = selectionCount.get
+  def getPartitionViewArgument(): Option[PartitionView] = partitionViewArgument

Review Comment:
   i followed the convention used with `getSelectionCount` due to
   ```
   partitionViewArgument is already defined in the scope
   ```
   i have removed the parentheses



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12877: KAFKA-14372: choose replicas only from isr for preferred read replica

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12877:
URL: https://github.com/apache/kafka/pull/12877#issuecomment-1326505121

   Merged to trunk and 3.3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org