You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/19 13:11:21 UTC

[GitHub] [kafka] songnon opened a new pull request, #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

songnon opened a new pull request, #12543:
URL: https://github.com/apache/kafka/pull/12543

   Auto preferred leader election should be allowed for partitions which don't have any
   ongoing reassignments. Otherwise, it might result in unbalanced leaders if there are
   any long-running reassignments. For partitions being reassigned, leader election will
   be triggered at the end of partition reassignment procedure automatically if the
   preferred leader is not in the TRS. We should avoid interrupting them.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

Posted by GitBox <gi...@apache.org>.
songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r950692749


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,

Review Comment:
   Yes, it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r950574990


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {

Review Comment:
   I think it would still work fine, but did we consider a situation where the preferred leader election occurs right before or at the same time as the reassignment completes? 
   For example, we have a reassignment going, a broker is restarted, and we may or may not have preferred leader election before it completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1277960339

   > I believe it's valid to perform preferred leader election for topics even if they're currently undergoing reassignment
   
   Got it. I think the PR description here confused me then.
   > Auto preferred leader election should be allowed for partitions which don't have any ongoing reassignments.
   
   Perhaps we can update this to make it clearer that we can also do the preferred leadership election on partitions undergoing reassignment to update the preferred leader sooner. I also now see the steps being referenced are in KafkaController. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r959610302


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,58 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithOtherReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> Seq(otherBrokerId))
+    zkClient.createPartitionReassignment(reassignment)
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
+      "failed to get expected partition state after partition reassignment")
+  }
+
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithSamePartitionBeingReassigned(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassignment = Map(tp -> Seq(otherBrokerId, leaderBrokerId))
+    zkClient.createPartitionReassignment(reassignment)
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3,
+      "failed to get expected partition state after partition reassignment")
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
+      "failed to get expected partition state after auto preferred replica leader election")

Review Comment:
   Is this guaranteed to test the code path that we're interested in? Is it possible that the reassignment completes before the next automatic leader election is triggered, in which case we don't actually verify that the election has taken place while the reassignment is active?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r959610302


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,58 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithOtherReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> Seq(otherBrokerId))
+    zkClient.createPartitionReassignment(reassignment)
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
+      "failed to get expected partition state after partition reassignment")
+  }
+
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithSamePartitionBeingReassigned(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassignment = Map(tp -> Seq(otherBrokerId, leaderBrokerId))
+    zkClient.createPartitionReassignment(reassignment)
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3,
+      "failed to get expected partition state after partition reassignment")
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
+      "failed to get expected partition state after auto preferred replica leader election")

Review Comment:
   Is it possible that the reassignment completes before the next automatic leader election is triggered, in which case we don't actually verify that the election has taken place while the reassignment is active?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

Posted by GitBox <gi...@apache.org>.
songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r953835358


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,

Review Comment:
   I guess the scenario for partition `reassigningTp` in this test case is a very typical scenario. We have a similar test `testPartitionReassignmentResumesAfterReplicaComesOnline` in this file. I just use it to simulate an ongoing reassignment. I can add some comments if it's necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
songnon commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1232861959

   > Should we also add a test case for when the leader election takes place on the same topic as the one undergoing reassignment?
   
   @C0urante Added a new test case. Could you have a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

Posted by GitBox <gi...@apache.org>.
songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r950920059


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {

Review Comment:
   During a partition's reassignment, if the current leader is not in TRS or isn't alive, a leader election will be triggered at the time when the TRS = ISR. At that time, the partition is still in `controllerContext.partitionsBeingReassigned`, so `checkAndTriggerAutoLeaderRebalance` will skip that particular partition. 
   When the reassignment is done (partition being removed from `controllerContext.partitionsBeingReassigned`),  `checkAndTriggerAutoLeaderRebalance` will treat it as a normal partition, preferred leader election may be triggered if it's allowed to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r961649493


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })

Review Comment:
   Enhanced the test case a bit. Now it's required to construct an entire `ReplicaAssignment`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

Posted by GitBox <gi...@apache.org>.
songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r953835358


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,

Review Comment:
   I guess the scenario for partition `reassigningTp` in the test case is a very typical scenario. We have a similar test `testPartitionReassignmentResumesAfterReplicaComesOnline` in this file. I just use it to simulate an ongoing reassignment. I can add some comments if it's necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r952935878


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,

Review Comment:
   Got it -- wasn't sure if it was worth adding a partition state for epoch + 3 or leaving a comment about what we expect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r961648621


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,58 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithOtherReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> Seq(otherBrokerId))
+    zkClient.createPartitionReassignment(reassignment)
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
+      "failed to get expected partition state after partition reassignment")
+  }
+
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithSamePartitionBeingReassigned(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassignment = Map(tp -> Seq(otherBrokerId, leaderBrokerId))
+    zkClient.createPartitionReassignment(reassignment)
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3,
+      "failed to get expected partition state after partition reassignment")
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
+      "failed to get expected partition state after auto preferred replica leader election")

Review Comment:
   hmm, that's a good point. Updated the test case to make sure election is taken place while the reassignment is active.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12543:
URL: https://github.com/apache/kafka/pull/12543


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1295433463

   @junrao Was there an underlying reason not to allow this previously?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
junrao commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1295446037

   Previously, the controller logic wasn't single threaded. So, this check is probably just to prevent concurrent actions. Now that all logic in the controller runs in a single thread, this constraint could be revisited.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r958619212


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })

Review Comment:
   Can this be simplified? Don't think we need to construct an entire `ReplicaAssignment` here:
   ```suggestion
       val reassignment = Map(reassigningTp -> Seq(otherBrokerId))
       zkClient.createPartitionReassignment(reassignment)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
songnon commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1231238770

   Update the code and commit message. It looks like partitions being reassigned can also benefit from auto preferred leader election


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1273679645

   @C0urante thanks for reminding me about this. I think the last thing I want to confirm is that in the case where we have a reassignment in progress -- we don't do the preferred leader election. I see we added the test case, but its a bit hard to parse. 
   
   I'm also a little confused by shutting down the broker that is not part of the assignment in the beginning. Some comments there could really help me understand the scenario better.
   
   Looks like the producer ID test didn't fail again, so it might be flaky. I will keep an eye on it. Looks like the builds are also red now, but maybe that's an issue on master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r950576126


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,

Review Comment:
   Is this leader epoch +4 because the startup and the reassignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r959153232


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })

Review Comment:
   You are right. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
songnon commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1278458717

   > Perhaps we can update this to make it clearer that we can also do the preferred leadership election on partitions undergoing reassignment to update the preferred leader sooner.
   
   Fixed. I forgot to update the PR description after making some code change and updating the commit text.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] songnon commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
songnon commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1278487313

   > I'm also a little confused by shutting down the broker that is not part of the assignment in the beginning. Some comments there could really help me understand the scenario better.
   
   @jolshan @C0urante I have added some comments in the test code. Hope it helps. Let me know if there are any questions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1275066936

   Thanks @jolshan. I believe it's valid to perform preferred leader election for topics even if they're currently undergoing reassignment, for reasons [discussed in an earlier PR](https://github.com/apache/kafka/pull/9302#discussion_r491094781) for this ticket by @hachikuji:
   > During a reassignment, the adding replicas are always listed first which means the preferred leader is among the target replicas. My take is that we want to move the leadership onto the new preferred leader as soon as possible since the whole point of the reassignment is to take load off the removing replicas.
   
   I've double-checked the code base and this still appears to be the case; let me know if I'm missing something, though.
   
   Definitely agree RE commenting the test code; @songnon if you have time would you mind adding some details? Explanation of why we're shutting down and restarting which brokers when (e.g., to create a stuck reassignment, or to create a pending preferred leader election) should be plenty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org