You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/25 16:37:19 UTC

[kafka] branch trunk updated: MINOR: Change `AlterPartition` validation order in `KafkaController` (#12032)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25ee7f147c MINOR: Change `AlterPartition` validation order in `KafkaController` (#12032)
25ee7f147c is described below

commit 25ee7f147c8b798a881538f59ba7b8544fbb2b1a
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Apr 25 09:37:03 2022 -0700

    MINOR: Change `AlterPartition` validation order in `KafkaController` (#12032)
    
    Currently we validate recovery state before checking leader epoch in `KafkaController`. It seems more intuitive to validate leader epoch first since the leader might be working with stale state, which is what we do in KRaft. This patch fixes this and adds a couple additional validations to make the behavior consistent.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>
---
 .../scala/kafka/controller/KafkaController.scala   |  19 +-
 .../controller/ControllerIntegrationTest.scala     | 325 +++++++++++++++++----
 2 files changed, 285 insertions(+), 59 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 61776b351d..bf83ab9d6d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2347,7 +2347,17 @@ class KafkaController(val config: KafkaConfig,
           controllerContext.partitionLeadershipInfo(tp) match {
             case Some(leaderIsrAndControllerEpoch) =>
               val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-              if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+              if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
+                None
+              } else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) {
+                partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+                None
+              } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+                // If a partition is already in the desired state, just return it
+                partitionResponses(tp) = Right(currentLeaderAndIsr)
+                None
+              } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
                 partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
                 info(
                   s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " +
@@ -2363,13 +2373,6 @@ class KafkaController(val config: KafkaConfig,
                   s"RECOVERED to RECOVERING: $newLeaderAndIsr"
                 )
                 None
-              } else if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) {
-                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
-                None
-              } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
-                // If a partition is already in the desired state, just return it
-                partitionResponses(tp) = Right(currentLeaderAndIsr)
-                None
               } else {
                 Some(tp -> newLeaderAndIsr)
               }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 46391922b5..aa631c95e4 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -847,7 +847,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
   }
 
   @Test
-  def testIdempotentAlterIsr(): Unit = {
+  def testIdempotentAlterPartition(): Unit = {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBroker = servers.find(_.config.brokerId != controllerId).get
@@ -881,7 +881,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
   }
 
   @Test
-  def testAlterIsrErrors(): Unit = {
+  def testAlterPartitionErrors(): Unit = {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val tp = new TopicPartition("t", 0)
@@ -890,65 +890,288 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     val controller = getController().kafkaController
-    var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
-      Map(tp -> LeaderAndIsr(controllerId, replicas)))
-    var capturedError = future.get(5, TimeUnit.SECONDS)
-    assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
-
-    future = captureAlterIsrError(99, controller.brokerEpoch,
-      Map(tp -> LeaderAndIsr(controllerId, replicas)))
-    capturedError = future.get(5, TimeUnit.SECONDS)
-    assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
-
-    val unknownTopicPartition = new TopicPartition("unknown", 99)
-    future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
-      Map(unknownTopicPartition -> LeaderAndIsr(controllerId, replicas)), unknownTopicPartition)
-    capturedError = future.get(5, TimeUnit.SECONDS)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
-
-    future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
-      Map(tp -> LeaderAndIsr(controllerId, 1, replicas, LeaderRecoveryState.RECOVERED, 99)), tp)
-    capturedError = future.get(5, TimeUnit.SECONDS)
-    assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
-
-    future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
-      Map(tp -> LeaderAndIsr(controllerId, 1, replicas, LeaderRecoveryState.RECOVERING, 1)), tp)
-    capturedError = future.get(5, TimeUnit.SECONDS)
-    assertEquals(Errors.INVALID_REQUEST, capturedError)
-
-    future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
-      Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), LeaderRecoveryState.RECOVERING, 1)), tp)
-    capturedError = future.get(5, TimeUnit.SECONDS)
-    assertEquals(Errors.INVALID_REQUEST, capturedError)
+    val partitionState = controller.controllerContext.partitionLeadershipInfo(tp).get
+    val leaderId = partitionState.leaderAndIsr.leader
+    val leaderBrokerEpoch = servers(leaderId).kafkaController.brokerEpoch
+    val leaderEpoch = partitionState.leaderAndIsr.leaderEpoch
+    val partitionEpoch = partitionState.leaderAndIsr.partitionEpoch
+
+    def assertAlterPartition(
+      topLevelError: Errors = Errors.NONE,
+      partitionError: Errors = Errors.NONE,
+      topicPartition: TopicPartition = tp,
+      leaderId: Int = leaderId,
+      brokerEpoch: Long = leaderBrokerEpoch,
+      leaderEpoch: Int = leaderEpoch,
+      partitionEpoch: Int = partitionEpoch,
+      isr: Set[Int] = replicas.toSet,
+      leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+    ): Unit = {
+      assertAlterPartitionError(
+        topicPartition = topicPartition,
+        leaderId = leaderId,
+        brokerEpoch = brokerEpoch,
+        leaderEpoch = leaderEpoch,
+        partitionEpoch = partitionEpoch,
+        isr = isr,
+        leaderRecoveryState = leaderRecoveryState,
+        topLevelError = topLevelError,
+        partitionError = partitionError
+      )
+    }
+
+    assertAlterPartition(
+      topLevelError = Errors.STALE_BROKER_EPOCH,
+      brokerEpoch = leaderBrokerEpoch - 1
+    )
+
+    assertAlterPartition(
+      topLevelError = Errors.STALE_BROKER_EPOCH,
+      leaderId = 99,
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
+      topicPartition = new TopicPartition("unknown", 0)
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
+      topicPartition = new TopicPartition(tp.topic, 1)
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.INVALID_UPDATE_VERSION,
+      partitionEpoch = partitionEpoch - 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderEpoch = leaderEpoch - 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderEpoch = leaderEpoch + 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.INVALID_REQUEST,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.INVALID_REQUEST,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      isr = Set(controllerId)
+    )
+
+    // Version/epoch errors take precedence over other validations since
+    // the leader may be working with outdated state.
+
+    assertAlterPartition(
+      partitionError = Errors.INVALID_UPDATE_VERSION,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      partitionEpoch = partitionEpoch - 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      leaderEpoch = leaderEpoch - 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      leaderEpoch = leaderEpoch + 1
+    )
   }
 
-  def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
-    val future = new CompletableFuture[Errors]()
+  @Test
+  def testAlterPartitionErrorsAfterUncleanElection(): Unit = {
+    // - Start 3 brokers with unclean election enabled
+    // - Create a topic with two non-controller replicas: A and B
+    // - Shutdown A to bring ISR to [B]
+    // - Shutdown B to make partition offline
+    // - Restart A to force unclean election with ISR [A]
+    // - Verify AlterPartition handling in this state
+
+    servers = makeServers(numConfigs = 3, uncleanLeaderElectionEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val controller = getController().kafkaController
-    val callback: AlterPartitionCallback = {
-      case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
-        future.completeExceptionally(new AssertionError(s"Should have seen top-level error"))
-      case Right(error: Errors) =>
-        future.complete(error)
+
+    val tp = new TopicPartition("t", 0)
+    val replicas = servers.map(_.config.nodeId).filter(_ != controllerId).take(2).toList
+    val assignment = Map(tp.partition -> replicas)
+
+    val replica1 :: replica2 :: Nil = replicas
+
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    servers(replica1).shutdown()
+    servers(replica1).awaitShutdown()
+
+    val partitionStateAfterFirstShutdown = controller.controllerContext.partitionLeadershipInfo(tp).get
+    assertEquals(replica2, partitionStateAfterFirstShutdown.leaderAndIsr.leader)
+    assertEquals(Set(replica2), partitionStateAfterFirstShutdown.leaderAndIsr.isr.toSet)
+
+    servers(replica2).shutdown()
+    servers(replica2).awaitShutdown()
+
+    val partitionStateAfterSecondShutdown = controller.controllerContext.partitionLeadershipInfo(tp).get
+    assertEquals(-1, partitionStateAfterSecondShutdown.leaderAndIsr.leader)
+    assertEquals(Set(replica2), partitionStateAfterSecondShutdown.leaderAndIsr.isr.toSet)
+
+    servers(replica1).startup()
+    TestUtils.waitUntilLeaderIsKnown(servers, tp)
+
+    val partitionStateAfterRestart = controller.controllerContext.partitionLeadershipInfo(tp).get
+    assertEquals(replica1, partitionStateAfterRestart.leaderAndIsr.leader)
+    assertEquals(Set(replica1), partitionStateAfterRestart.leaderAndIsr.isr.toSet)
+    assertEquals(LeaderRecoveryState.RECOVERING, partitionStateAfterRestart.leaderAndIsr.leaderRecoveryState)
+
+    val leaderId = replica1
+    val leaderBrokerEpoch = servers(replica1).kafkaController.brokerEpoch
+    val leaderEpoch = partitionStateAfterRestart.leaderAndIsr.leaderEpoch
+    val partitionEpoch = partitionStateAfterRestart.leaderAndIsr.partitionEpoch
+
+    def assertAlterPartition(
+      topLevelError: Errors = Errors.NONE,
+      partitionError: Errors = Errors.NONE,
+      leaderId: Int = leaderId,
+      brokerEpoch: Long = leaderBrokerEpoch,
+      leaderEpoch: Int = leaderEpoch,
+      partitionEpoch: Int = partitionEpoch,
+      leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+    ): Unit = {
+      assertAlterPartitionError(
+        topicPartition = tp,
+        leaderId = leaderId,
+        brokerEpoch = brokerEpoch,
+        leaderEpoch = leaderEpoch,
+        partitionEpoch = partitionEpoch,
+        isr = replicas.toSet,
+        leaderRecoveryState = leaderRecoveryState,
+        topLevelError = topLevelError,
+        partitionError = partitionError
+      )
     }
-    controller.eventManager.put(AlterPartitionReceived(brokerId, brokerEpoch, isrsToAlter, callback))
-    future
+
+    assertAlterPartition(
+      topLevelError = Errors.STALE_BROKER_EPOCH,
+      brokerEpoch = leaderBrokerEpoch - 1
+    )
+
+    assertAlterPartition(
+      topLevelError = Errors.STALE_BROKER_EPOCH,
+      leaderId = 99
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.INVALID_UPDATE_VERSION,
+      partitionEpoch = partitionEpoch - 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderEpoch = leaderEpoch - 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderEpoch = leaderEpoch + 1
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.INVALID_REQUEST,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+    )
+
+    // Version/epoch errors take precedence over other validations since
+    // the leader may be working with outdated state.
+
+    assertAlterPartition(
+      partitionError = Errors.INVALID_UPDATE_VERSION,
+      partitionEpoch = partitionEpoch - 1,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderEpoch = leaderEpoch - 1,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+    )
+
+    assertAlterPartition(
+      partitionError = Errors.FENCED_LEADER_EPOCH,
+      leaderEpoch = leaderEpoch + 1,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+    )
   }
 
-  def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = {
-    val future = new CompletableFuture[Errors]()
-    val controller = getController().kafkaController
+  def assertAlterPartitionError(
+    topicPartition: TopicPartition,
+    leaderId: Int,
+    brokerEpoch: Long,
+    leaderEpoch: Int,
+    partitionEpoch: Int,
+    isr: Set[Int],
+    leaderRecoveryState: LeaderRecoveryState,
+    topLevelError: Errors,
+    partitionError: Errors,
+  ): Unit = {
+    val leaderAndIsr = LeaderAndIsr(
+      leader = leaderId,
+      leaderEpoch = leaderEpoch,
+      isr = isr.toList,
+      partitionEpoch = partitionEpoch,
+      leaderRecoveryState = leaderRecoveryState
+    )
+
+    val future = captureAlterPartitionError(
+      brokerId = leaderId,
+      brokerEpoch = brokerEpoch,
+      topicPartition = topicPartition,
+      leaderAndIsr = leaderAndIsr
+    )
+
+    val errors = future.get(10, TimeUnit.SECONDS)
+    assertEquals(topLevelError, errors.topLevelError)
+
+    if (topLevelError == Errors.NONE) {
+      assertEquals(Some(partitionError), errors.partitionError)
+    }
+  }
+
+  private case class AlterPartitionError(topLevelError: Errors, partitionError: Option[Errors])
+
+  private def captureAlterPartitionError(
+    brokerId: Int,
+    brokerEpoch: Long,
+    topicPartition: TopicPartition,
+    leaderAndIsr: LeaderAndIsr
+  ): CompletableFuture[AlterPartitionError] = {
+    val future = new CompletableFuture[AlterPartitionError]()
     val callback: AlterPartitionCallback = {
       case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
-        partitionResults.get(tp) match {
-          case Some(Left(error: Errors)) => future.complete(error)
-          case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result"))
-          case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result"))
+        partitionResults.get(topicPartition) match {
+          case Some(Left(error: Errors)) =>
+            future.complete(AlterPartitionError(topLevelError = Errors.NONE, partitionError = Some(error)))
+
+          case Some(Right(_: LeaderAndIsr)) =>
+            future.complete(AlterPartitionError(topLevelError = Errors.NONE, partitionError = Some(Errors.NONE)))
+
+          case None =>
+            future.completeExceptionally(new AssertionError(s"Should have seen $topicPartition in result"))
         }
-      case Right(_: Errors) =>
-        future.completeExceptionally(new AssertionError(s"Should not seen top-level error"))
+
+      case Right(error: Errors) =>
+        future.complete(AlterPartitionError(topLevelError = error, partitionError = None))
     }
-    controller.eventManager.put(AlterPartitionReceived(brokerId, brokerEpoch, isrsToAlter, callback))
+
+    val partitionsToAlter = Map(topicPartition -> leaderAndIsr)
+    val controller = getController().kafkaController
+    controller.eventManager.put(AlterPartitionReceived(brokerId, brokerEpoch, partitionsToAlter, callback))
     future
   }