You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/06 04:58:26 UTC

[GitHub] [kafka] splett2 commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

splett2 commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939483859


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(fullIsr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+      future.complete
+    ))

Review Comment:
   do we want a test for compatibility for older versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org