You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/11 23:56:31 UTC

[kafka] branch 3.3 updated: KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if leader is ahead of controller (#12506)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 36ff5d5c98f KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if leader is ahead of controller (#12506)
36ff5d5c98f is described below

commit 36ff5d5c98f2649ec2cc5ab302375cd41298a727
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Aug 11 16:43:12 2022 -0700

    KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if leader is ahead of controller (#12506)
    
    It is possible for the leader to send an `AlterPartition` request to a zombie controller which includes either a partition or leader epoch which is larger than what is found in the controller context. Prior to https://github.com/apache/kafka/pull/12032, the controller handled this in the following way:
    
    1. If the `LeaderAndIsr` state exactly matches the current state on the controller excluding the partition epoch, then the `AlterPartition` request is considered successful and no error is returned. The risk with this handling is that this may cause the leader to incorrectly assume that the state had been successfully updated. Since the controller's state is stale, there is no way to know what the latest ISR state is.
    2. Otherwise, the controller will attempt to update the state in zookeeper with the leader/partition epochs from the `AlterPartition` request. This operation would fail if the controller's epoch was not still current in Zookeeper and the result would be a `NOT_CONTROLLER` error.
    
    Following https://github.com/apache/kafka/pull/12032, the controller's validation is stricter. If the partition epoch is larger than expected, then the controller will return `INVALID_UPDATE_VERSION` without attempting the operation. Similarly, if the leader epoch is larger than expected, the controller will return `FENCED_LEADER_EPOCH`. The problem with this new handling is that the leader treats the errors from the controller as authoritative. For example, if it sees the `FENCED_LEA [...]
    
    In this patch, we want to fix the issues with this handling, but we don't want to restore the buggy idempotent check. The approach is straightforward. If the controller sees a partition/leader epoch which is larger than what it has in the controller context, then it assumes that has become a zombie and returns `NOT_CONTROLLER` to the leader. This will cause the leader to attempt to reset the controller from its local metadata cache and retry the `AlterPartition` request.
    
    Reviewers: David Jacot <dj...@confluent.io>, José Armando García Sancio <js...@users.noreply.github.com>
---
 .../scala/kafka/controller/KafkaController.scala   |  9 +++++++-
 .../controller/ControllerIntegrationTest.scala     | 27 ++++++++++++++++++----
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0154d9cbe54..999bcb818e9 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2336,7 +2336,14 @@ class KafkaController(val config: KafkaConfig,
       controllerContext.partitionLeadershipInfo(tp) match {
         case Some(leaderIsrAndControllerEpoch) =>
           val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-          if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+          if (newLeaderAndIsr.partitionEpoch > currentLeaderAndIsr.partitionEpoch
+              || newLeaderAndIsr.leaderEpoch > currentLeaderAndIsr.leaderEpoch) {
+            // If the partition leader has a higher partition/leader epoch, then it is likely
+            // that this node is no longer the active controller. We return NOT_CONTROLLER in
+            // this case to give the leader an opportunity to find the new controller.
+            partitionResponses(tp) = Left(Errors.NOT_CONTROLLER)
+            None
+          } else if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
             partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
             None
           } else if (newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 0c8d000656a..532ff1a946e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1184,7 +1184,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     )
 
     assertAlterPartition(
-      partitionError = Errors.INVALID_UPDATE_VERSION,
+      partitionError = Errors.NOT_CONTROLLER,
       partitionEpoch = partitionEpoch + 1
     )
 
@@ -1194,7 +1194,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderEpoch = leaderEpoch + 1
     )
 
@@ -1218,6 +1218,12 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       partitionEpoch = partitionEpoch - 1
     )
 
+    assertAlterPartition(
+      partitionError = Errors.NOT_CONTROLLER,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
+      partitionEpoch = partitionEpoch + 1
+    )
+
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
@@ -1225,7 +1231,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
       leaderEpoch = leaderEpoch + 1
     )
@@ -1324,13 +1330,18 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       partitionEpoch = partitionEpoch - 1
     )
 
+    assertAlterPartition(
+      partitionError = Errors.NOT_CONTROLLER,
+      partitionEpoch = partitionEpoch + 1
+    )
+
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderEpoch = leaderEpoch - 1
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderEpoch = leaderEpoch + 1
     )
 
@@ -1348,6 +1359,12 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )
 
+    assertAlterPartition(
+      partitionError = Errors.NOT_CONTROLLER,
+      partitionEpoch = partitionEpoch + 1,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
+    )
+
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderEpoch = leaderEpoch - 1,
@@ -1355,7 +1372,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderEpoch = leaderEpoch + 1,
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )