You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/09 19:04:13 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12271: KAFKA-13972; Ensure that replicas are stopped after cancelled reassignment

hachikuji commented on code in PR #12271:
URL: https://github.com/apache/kafka/pull/12271#discussion_r893863016


##########
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala:
##########
@@ -315,35 +329,43 @@ class ZkReplicaStateMachine(config: KafkaConfig,
   }
 
   /**
-   * Try to remove a replica from the isr of multiple partitions.
-   * Removing a replica from isr updates partition state in zookeeper.
+   * Fence an existing replica by bumping the leader epoch. The bumped leader epoch
+   * ensures 1) that the follower can no longer fetch with the old epoch, and 2)
+   * that it will accept a `StopReplica` request with the bumped epoch.
    *
-   * @param replicaId The replica being removed from isr of multiple partitions
-   * @param partitions The partitions from which we're trying to remove the replica from isr
+   * - If the replica is the current leader, then the leader will be changed to
+   *   [[LeaderAndIsr.NoLeader]], and it will remain in the ISR.
+   * - If the replica is not the current leader and it is in the ISR, then it
+   *   will be removed from the ISR.
+   * - Otherwise, the epoch will be bumped and the leader and ISR will be unchanged.
+   *
+   * Fencing a replica updates partition state in zookeeper.
+   *
+   * @param replicaId The replica being fenced from multiple partitions
+   * @param partitions The partitions from which we're trying to fence the replica from
    * @return A tuple of two elements:
    *         1. The updated Right[LeaderIsrAndControllerEpochs] of all partitions for which we successfully
-   *         removed the replica from isr. Or Left[Exception] corresponding to failed removals that should
-   *         not be retried
+   *         fenced the replica. Or Left[Exception] for failures which need to be retries
    *         2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
    *         the partition leader updated partition state while the controller attempted to update partition state.
    */
-  private def doRemoveReplicasFromIsr(
+  private def tryFenceReplicas(
     replicaId: Int,
     partitions: Seq[TopicPartition]
   ): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = {
     val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) = getTopicPartitionStatesFromZk(partitions)
-    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, result) =>
-      result.map { leaderAndIsr =>
-        leaderAndIsr.isr.contains(replicaId)
-      }.getOrElse(false)
-    }
-
-    val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrsWithReplica.flatMap {
+    val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrs.flatMap {
       case (partition, result) =>
         result.toOption.map { leaderAndIsr =>
-          val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
-          val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
-          partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+          if (leaderAndIsr.isr.contains(replicaId)) {
+            val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
+            val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
+            partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+          } else {
+            // Even if the replica is not in the ISR. We must bump the epoch to ensure the replica
+            // is fenced from replication and the `StopReplica` can be sent with a bumped epoch.
+            partition -> leaderAndIsr.newEpoch

Review Comment:
   @junrao Thanks for the comments. I think you are right about this issue affecting `ControlledShutdown` as well. I have been trying to understand that path a little better since it looks like it is not doing quite what we expect today. Also agree about the epoch bump being too low-level. We might need the fencing of replicas to happen at the partition level instead of the replica level. Let me take a look at this.
   
   Interestingly, I don't think the issue affects KRaft. We bump the leader epoch in KRaft any time a replica is removed from the ISR or the replica set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org