You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/05 22:15:51 UTC

[GitHub] [kafka] jolshan opened a new pull request, #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

jolshan opened a new pull request, #12487:
URL: https://github.com/apache/kafka/pull/12487

   As part of [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft) we prevent shutting down brokers from being added to ISR (and therefore ineligible to become leader). 
   
   We want to do the same in 3.3 for ZK to protect against edge cases and not have to do a version bump in future versions.
   
   See this PR to see the equivalent change for KRaft mode: https://github.com/apache/kafka/pull/12181/commits/b6cb29503e025989308d1e0d9ce30efa74ca7974
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12487:
URL: https://github.com/apache/kafka/pull/12487#issuecomment-1207172718

   The title of the PR is misleading here. There is no « fenced » state in ZK and the patch only prevent offline replicas to join the ISR, not the shutting down ones. Is it intentional?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939695273


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(fullIsr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+      future.complete
+    ))

Review Comment:
   Ah -- so looking for the other error message instead? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941604866


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,16 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR.
       case kRaftMetadataCache: KRaftMetadataCache =>
         !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
           !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
 
+      // In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,'

Review Comment:
   nit: unintentional apostrophe at the end?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939695243


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(fullIsr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+      future.complete
+    ))
+
+    // We expect an ineligble replica error response for the partition.
+    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(Errors.INELIGIBLE_REPLICA.code())
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)

Review Comment:
   Ah -- that's a good point 😅



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939484120


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2363,7 +2363,23 @@ class KafkaController(val config: KafkaConfig,
             )
             None
           } else {
-            Some(tp -> newLeaderAndIsr)
+            // Pull out replicas being added to ISR and verify they are all online
+            // If a replica is not online, reject the update as specified in KIP-841.
+            val ineligibleReplicas = newLeaderAndIsr.isr.filter(!controllerContext.liveBrokerIds.contains(_))

Review Comment:
   maybe we can use `--` here - it may be a bit more concise.
   On the line below we can also check `ineligibleReplicas.nonEmpty` 



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2363,7 +2363,23 @@ class KafkaController(val config: KafkaConfig,
             )
             None
           } else {
-            Some(tp -> newLeaderAndIsr)
+            // Pull out replicas being added to ISR and verify they are all online
+            // If a replica is not online, reject the update as specified in KIP-841.
+            val ineligibleReplicas = newLeaderAndIsr.isr.filter(!controllerContext.liveBrokerIds.contains(_))

Review Comment:
   maybe we can use `--` here - it may be a bit more concise.
   On the line below we can also check `ineligibleReplicas.nonEmpty` instead of doing a size comparison
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939869146


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {

Review Comment:
   I wonder if we can just test this case in one of the variants in `testAlterPartitionErrors`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939484120


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2363,7 +2363,23 @@ class KafkaController(val config: KafkaConfig,
             )
             None
           } else {
-            Some(tp -> newLeaderAndIsr)
+            // Pull out replicas being added to ISR and verify they are all online
+            // If a replica is not online, reject the update as specified in KIP-841.
+            val ineligibleReplicas = newLeaderAndIsr.isr.filter(!controllerContext.liveBrokerIds.contains(_))

Review Comment:
   maybe we can use `--` here - it may be a bit more concise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12487:
URL: https://github.com/apache/kafka/pull/12487#issuecomment-1209962842

   Failed tests passed locally:
   
   [Build / JDK 8 and Scala 2.12 / kafka.admin.DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsNonExistingGroup()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.admin/DeleteOffsetsConsumerGroupCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDeleteOffsetsNonExistingGroup__/)
   [Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.server/KRaftClusterTest/Build___JDK_8_and_Scala_2_12___testCreateClusterAndPerformReassignment__/)
   [Build / JDK 11 and Scala 2.13 / kafka.log.LogCleanerIntegrationTest.testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.log/LogCleanerIntegrationTest/Build___JDK_11_and_Scala_2_13___testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics__/)
   [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/org.apache.kafka.connect.integration/ConnectorRestartApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testMultiWorkerRestartOnlyConnector/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939547038


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.

Review Comment:
   In ControllerChannelManager.sendUpdateMetadataRequests(), it seems that we include shutting down broker in liveBrokers. So, we won't know whether a remote broker is shutting down.



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(fullIsr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+      future.complete
+    ))
+
+    // We expect an ineligble replica error response for the partition.
+    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(Errors.INELIGIBLE_REPLICA.code())
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)

Review Comment:
   Should we read leaderAndIsr from controller again?



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))

Review Comment:
   Could we just get the leaderAndIsr from the controller? It's cheaper than reading from ZK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941543760


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.

Review Comment:
   Ok makes sense. Should I change the comment to reflect that this will not block shutting down brokers here, but will be blocked controller side?
   
   I think for at least this PR (which we want to get into 3.3) we should hold off on protocol changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12487:
URL: https://github.com/apache/kafka/pull/12487#issuecomment-1210342696

   Merged to trunk and 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941598423


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.

Review Comment:
   Ah my comment is slightly different in the latest commit. Let me know if I should change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12487:
URL: https://github.com/apache/kafka/pull/12487


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12487:
URL: https://github.com/apache/kafka/pull/12487#issuecomment-1208314532

   @dajac I can remove the reference to fenced. As for shutting down brokers -- they are prevented in the kafka controller code(otherwise I'd use liveOrShuttingDownBrokerIds), but not the metadata cache code as I understand.
   
   I can try to modify the metadata cache if possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941077938


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1375,8 +1377,11 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
-  @Test
-  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIsrNotExpandedIfReplicaIsFenced(quorum: String): Unit = {

Review Comment:
   nit: Should we update the test name as well?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.

Review Comment:
   This basically means that the leader will retry adding back the shutting-down broker to the ISR until the shutting-down broker is removed from the metadata cache. It is worth noting that, during this time, other replicas cannot be added back to the ISR. The controller rejects any ISR expansion containing at least one ineligible replica. This is why we added that in-controller-shutdown state in KRaft. It allows the leader to filter them out as soon.
   
   This may be acceptable here. Otherwise, we would have to propagate the shutting-down brokers via the UpdateMetadataRequest. What do others think?
   



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))
+  def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = controller.controllerContext.partitionsLeadershipInfo
+    val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), leaderAndIsr.isr)
+
+    val requestTopic = new AlterPartitionRequestData.TopicData()
+      .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+        .setPartitionIndex(tp.partition)
+        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+        .setNewIsr(fullIsr.map(Int.box).asJava)
+        .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
+    if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else requestTopic.setTopicName(tp.topic)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(requestTopic).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionVersion,
+      future.complete
+    ))

Review Comment:
   nit: This piece of code is used in multiple places now. I wonder if it is worth pulling it in a helper method. What do you think?



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))
+  def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = controller.controllerContext.partitionsLeadershipInfo
+    val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), leaderAndIsr.isr)
+
+    val requestTopic = new AlterPartitionRequestData.TopicData()
+      .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+        .setPartitionIndex(tp.partition)
+        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+        .setNewIsr(fullIsr.map(Int.box).asJava)
+        .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
+    if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else requestTopic.setTopicName(tp.topic)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(requestTopic).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionVersion,
+      future.complete
+    ))
+
+    val error = if (alterPartitionVersion > 1) Errors.INELIGIBLE_REPLICA else Errors.OPERATION_NOT_ATTEMPTED
+    val responseTopic = new AlterPartitionResponseData.TopicData()

Review Comment:
   nit: Should we use `expectedError` and `expectedResponseTopic`?



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2363,7 +2363,23 @@ class KafkaController(val config: KafkaConfig,
             )
             None
           } else {
-            Some(tp -> newLeaderAndIsr)
+            // Pull out replicas being added to ISR and verify they are all online

Review Comment:
   nit: `.` at the end of this sentence.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.
       case kRaftMetadataCache: KRaftMetadataCache =>
         !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
           !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
 
+      case zkMetadataCache: ZkMetadataCache =>

Review Comment:
   nit: Could we add the part of the comment related to ZK above this line?



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))

Review Comment:
   nit: You could use `@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939483859


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(fullIsr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+      future.complete
+    ))

Review Comment:
   do we want a test for compatibility for older versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939694944


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.

Review Comment:
   Yeah -- this was something I was looking into. I'm not sure if there is a way to also exclude shutting down brokers here. Is that also included in the metadata? I can take a look as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12487: KAFKA-14140: Ensure a fenced or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r940244720


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.

Review Comment:
   I don't think it's possible with ZK controller on the broker-side, but having the controller-side check is probably sufficient.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down.

Review Comment:
   I don't think it's possible with ZK controller on the leader-side, but having the controller-side check is probably sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941537699


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))

Review Comment:
   thanks! I was trying to remember how to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r940500710


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testShutdownBrokerNotAddedToIsr(): Unit = {

Review Comment:
   Perhaps? Do you think the current way isn't readable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org