You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/12 23:07:44 UTC

[GitHub] [kafka] hachikuji opened a new pull request, #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

hachikuji opened a new pull request, #13107:
URL: https://github.com/apache/kafka/pull/13107

   When a reassignment is cancelled, we need to delete the partition state of adding replicas. Failing to do so causes "stray" partitions which take up disk space and can cause topicId conflicts if the topic is recreated. Currently, this logic does not work because the leader epoch does not always get bumped after cancellation. Without a leader epoch bump, the replica will ignore `StopReplica` requests sent by the controller and the replica may remain online.
   
   In this patch, we fix the issue by sending the sentinel -2 in the `StopReplica` request. Currently, this sentinel is used when a topic is being deleted, which is another case where we cannot depend on a leader epoch bump. This expands the usage to cover all replica deletions including the case of cancellation. 
   
   Note, this problem only affects the ZK controller. The integration tests added here nevertheless cover both metadata modes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1068804579


##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -436,17 +436,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int],
                                       topicPartition: TopicPartition,
                                       deletePartition: Boolean): Unit = {
-    // A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides
-    // any existing epoch.
-    val leaderEpoch = metadataInstance.leaderEpoch(topicPartition)
-
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = stopReplicaRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
-      val alreadyDelete = result.get(topicPartition).exists(_.deletePartition)
+      val updatedDeletePartition = deletePartition || result.get(topicPartition).exists(_.deletePartition)
+
+      // A sentinel (-2) is used as an epoch if the replica is to be deleted.
+      // It overrides any existing epoch.
+      val leaderEpoch = if (updatedDeletePartition) {

Review Comment:
   I guess my question is whether it covers cases of reassignment cancellation when the topic is not deleted. I wasn't sure if delete partition is true in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072693482


##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -436,17 +436,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int],
                                       topicPartition: TopicPartition,
                                       deletePartition: Boolean): Unit = {
-    // A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides
-    // any existing epoch.
-    val leaderEpoch = metadataInstance.leaderEpoch(topicPartition)
-
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = stopReplicaRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
-      val alreadyDelete = result.get(topicPartition).exists(_.deletePartition)
+      val updatedDeletePartition = deletePartition || result.get(topicPartition).exists(_.deletePartition)
+
+      // A sentinel (-2) is used as an epoch if the replica is to be deleted.
+      // It overrides any existing epoch.
+      val leaderEpoch = if (updatedDeletePartition) {

Review Comment:
   Yeah, `DeletePartition` is set whenever the replica should be deleted, which would be the case after cancellation for all adding replicas. It does not necessarily imply topic deletion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan merged pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan merged PR #13107:
URL: https://github.com/apache/kafka/pull/13107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1068783227


##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -436,17 +436,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int],
                                       topicPartition: TopicPartition,
                                       deletePartition: Boolean): Unit = {
-    // A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides
-    // any existing epoch.
-    val leaderEpoch = metadataInstance.leaderEpoch(topicPartition)
-
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = stopReplicaRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
-      val alreadyDelete = result.get(topicPartition).exists(_.deletePartition)
+      val updatedDeletePartition = deletePartition || result.get(topicPartition).exists(_.deletePartition)
+
+      // A sentinel (-2) is used as an epoch if the replica is to be deleted.
+      // It overrides any existing epoch.
+      val leaderEpoch = if (updatedDeletePartition) {

Review Comment:
   What I wanted is to have -2 sent as the leader epoch whenever the delete flag was enabled in the `StopReplica` request. That seemed simpler than trying to pinpoint specific cases, such as reassignment cancellation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1073904486


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
               // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-                  requestLeaderEpoch > currentLeaderEpoch) {
+                  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   I think the main thing is that a new reassignment will always have a leader epoch bump, so we are still ensured that the `StopReplica` cannot be reordered. It is still possible to have a stray replica though if the `StopReplica` from a cancellation is received before the initial `LeaderAndIsr` when the reassignment began. A better fix would be to bump the epoch after cancellation, but I didn't see a really simple way to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1073908135


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
               // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-                  requestLeaderEpoch > currentLeaderEpoch) {
+                  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   Got it. Thanks for clarifying. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072895900


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
               // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-                  requestLeaderEpoch > currentLeaderEpoch) {
+                  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   Just curious -- was the epoch check put in place because we were concerned about stale stop replicas? Just trying to figure out why we need it and the implications for adding the current epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072893785


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -1971,16 +1971,22 @@ object TestUtils extends Logging {
     )
   }
 
+  def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
+    val description = admin.describeTopics(Set(partition.topic).asJava)
+      .allTopicNames
+      .get
+      .asScala

Review Comment:
   nit: can we put a new line here to distinguish the two a bit more?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1068783227


##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -436,17 +436,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int],
                                       topicPartition: TopicPartition,
                                       deletePartition: Boolean): Unit = {
-    // A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides
-    // any existing epoch.
-    val leaderEpoch = metadataInstance.leaderEpoch(topicPartition)
-
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = stopReplicaRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
-      val alreadyDelete = result.get(topicPartition).exists(_.deletePartition)
+      val updatedDeletePartition = deletePartition || result.get(topicPartition).exists(_.deletePartition)
+
+      // A sentinel (-2) is used as an epoch if the replica is to be deleted.
+      // It overrides any existing epoch.
+      val leaderEpoch = if (updatedDeletePartition) {

Review Comment:
   Basically what I wanted is to have -2 sent as the leader epoch whenever the delete flag was enabled in the `StopReplica` request. That seemed simpler than trying to pinpoint specific cases, such as reassignment cancellation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #13107:
URL: https://github.com/apache/kafka/pull/13107#issuecomment-1385840547

   @jolshan @dajac This patch has been updated to loosen the epoch check on the broker side. The original approach seemed a little risky in the case a reassignment is cancelled and resubmitted. It might be possible for a`StopReplica` request corresponding to the cancellation to get ordered after the `LeaderAndIsr` for the resubmitted reassignment. With the loosened check, that would not be possible since a new reassignment would have a leader epoch bump: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L747.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072900020


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
               // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-                  requestLeaderEpoch > currentLeaderEpoch) {
+                  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   Looks like this is part of KIP-570. I will take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072903070


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
               // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-                  requestLeaderEpoch > currentLeaderEpoch) {
+                  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   Seems like the concern there was reassignment. I think equal to the leader epoch is ok though because if we are a replica for the current leader epoch then we can't send a stop replica unless the reassignment was cancelled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1068766335


##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -436,17 +436,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int],
                                       topicPartition: TopicPartition,
                                       deletePartition: Boolean): Unit = {
-    // A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides
-    // any existing epoch.
-    val leaderEpoch = metadataInstance.leaderEpoch(topicPartition)
-
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = stopReplicaRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
-      val alreadyDelete = result.get(topicPartition).exists(_.deletePartition)
+      val updatedDeletePartition = deletePartition || result.get(topicPartition).exists(_.deletePartition)
+
+      // A sentinel (-2) is used as an epoch if the replica is to be deleted.
+      // It overrides any existing epoch.
+      val leaderEpoch = if (updatedDeletePartition) {

Review Comment:
   Just to clarify this change is now including `result.get(topicPartition).exists(_.deletePartition)` as a condition to mark the epoch as -2? The rest is just cleaning up/simplifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org