You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/30 17:10:02 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

C0urante commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r958619212


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })

Review Comment:
   Can this be simplified? Don't think we need to construct an entire `ReplicaAssignment` here:
   ```suggestion
       val reassignment = Map(reassigningTp -> Seq(otherBrokerId))
       zkClient.createPartitionReassignment(reassignment)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org