You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/25 16:37:19 UTC
[kafka] branch trunk updated: MINOR: Change `AlterPartition` validation order in `KafkaController` (#12032)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25ee7f147c MINOR: Change `AlterPartition` validation order in `KafkaController` (#12032)
25ee7f147c is described below
commit 25ee7f147c8b798a881538f59ba7b8544fbb2b1a
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Apr 25 09:37:03 2022 -0700
MINOR: Change `AlterPartition` validation order in `KafkaController` (#12032)
Currently we validate recovery state before checking leader epoch in `KafkaController`. It seems more intuitive to validate leader epoch first since the leader might be working with stale state, which is what we do in KRaft. This patch fixes this and adds a couple additional validations to make the behavior consistent.
Reviewers: José Armando García Sancio <js...@users.noreply.github.com>
---
.../scala/kafka/controller/KafkaController.scala | 19 +-
.../controller/ControllerIntegrationTest.scala | 325 +++++++++++++++++----
2 files changed, 285 insertions(+), 59 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 61776b351d..bf83ab9d6d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2347,7 +2347,17 @@ class KafkaController(val config: KafkaConfig,
controllerContext.partitionLeadershipInfo(tp) match {
case Some(leaderIsrAndControllerEpoch) =>
val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+ if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+ partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
+ None
+ } else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) {
+ partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+ None
+ } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+ // If a partition is already in the desired state, just return it
+ partitionResponses(tp) = Right(currentLeaderAndIsr)
+ None
+ } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
info(
s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " +
@@ -2363,13 +2373,6 @@ class KafkaController(val config: KafkaConfig,
s"RECOVERED to RECOVERING: $newLeaderAndIsr"
)
None
- } else if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) {
- partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
- None
- } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
- // If a partition is already in the desired state, just return it
- partitionResponses(tp) = Right(currentLeaderAndIsr)
- None
} else {
Some(tp -> newLeaderAndIsr)
}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 46391922b5..aa631c95e4 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -847,7 +847,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
}
@Test
- def testIdempotentAlterIsr(): Unit = {
+ def testIdempotentAlterPartition(): Unit = {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBroker = servers.find(_.config.brokerId != controllerId).get
@@ -881,7 +881,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
}
@Test
- def testAlterIsrErrors(): Unit = {
+ def testAlterPartitionErrors(): Unit = {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val tp = new TopicPartition("t", 0)
@@ -890,65 +890,288 @@ class ControllerIntegrationTest extends QuorumTestHarness {
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
val controller = getController().kafkaController
- var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
- Map(tp -> LeaderAndIsr(controllerId, replicas)))
- var capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
-
- future = captureAlterIsrError(99, controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, replicas)))
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
-
- val unknownTopicPartition = new TopicPartition("unknown", 99)
- future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
- Map(unknownTopicPartition -> LeaderAndIsr(controllerId, replicas)), unknownTopicPartition)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
-
- future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, 1, replicas, LeaderRecoveryState.RECOVERED, 99)), tp)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
-
- future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, 1, replicas, LeaderRecoveryState.RECOVERING, 1)), tp)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.INVALID_REQUEST, capturedError)
-
- future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), LeaderRecoveryState.RECOVERING, 1)), tp)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.INVALID_REQUEST, capturedError)
+ val partitionState = controller.controllerContext.partitionLeadershipInfo(tp).get
+ val leaderId = partitionState.leaderAndIsr.leader
+ val leaderBrokerEpoch = servers(leaderId).kafkaController.brokerEpoch
+ val leaderEpoch = partitionState.leaderAndIsr.leaderEpoch
+ val partitionEpoch = partitionState.leaderAndIsr.partitionEpoch
+
+ def assertAlterPartition(
+ topLevelError: Errors = Errors.NONE,
+ partitionError: Errors = Errors.NONE,
+ topicPartition: TopicPartition = tp,
+ leaderId: Int = leaderId,
+ brokerEpoch: Long = leaderBrokerEpoch,
+ leaderEpoch: Int = leaderEpoch,
+ partitionEpoch: Int = partitionEpoch,
+ isr: Set[Int] = replicas.toSet,
+ leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+ ): Unit = {
+ assertAlterPartitionError(
+ topicPartition = topicPartition,
+ leaderId = leaderId,
+ brokerEpoch = brokerEpoch,
+ leaderEpoch = leaderEpoch,
+ partitionEpoch = partitionEpoch,
+ isr = isr,
+ leaderRecoveryState = leaderRecoveryState,
+ topLevelError = topLevelError,
+ partitionError = partitionError
+ )
+ }
+
+ assertAlterPartition(
+ topLevelError = Errors.STALE_BROKER_EPOCH,
+ brokerEpoch = leaderBrokerEpoch - 1
+ )
+
+ assertAlterPartition(
+ topLevelError = Errors.STALE_BROKER_EPOCH,
+ leaderId = 99,
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ topicPartition = new TopicPartition("unknown", 0)
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ topicPartition = new TopicPartition(tp.topic, 1)
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_UPDATE_VERSION,
+ partitionEpoch = partitionEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch + 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_REQUEST,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_REQUEST,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ isr = Set(controllerId)
+ )
+
+ // Version/epoch errors take precedence over other validations since
+ // the leader may be working with outdated state.
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_UPDATE_VERSION,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ partitionEpoch = partitionEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ leaderEpoch = leaderEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ leaderEpoch = leaderEpoch + 1
+ )
}
- def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
- val future = new CompletableFuture[Errors]()
+ @Test
+ def testAlterPartitionErrorsAfterUncleanElection(): Unit = {
+ // - Start 3 brokers with unclean election enabled
+ // - Create a topic with two non-controller replicas: A and B
+ // - Shutdown A to bring ISR to [B]
+ // - Shutdown B to make partition offline
+ // - Restart A to force unclean election with ISR [A]
+ // - Verify AlterPartition handling in this state
+
+ servers = makeServers(numConfigs = 3, uncleanLeaderElectionEnable = true)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val controller = getController().kafkaController
- val callback: AlterPartitionCallback = {
- case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
- future.completeExceptionally(new AssertionError(s"Should have seen top-level error"))
- case Right(error: Errors) =>
- future.complete(error)
+
+ val tp = new TopicPartition("t", 0)
+ val replicas = servers.map(_.config.nodeId).filter(_ != controllerId).take(2).toList
+ val assignment = Map(tp.partition -> replicas)
+
+ val replica1 :: replica2 :: Nil = replicas
+
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ servers(replica1).shutdown()
+ servers(replica1).awaitShutdown()
+
+ val partitionStateAfterFirstShutdown = controller.controllerContext.partitionLeadershipInfo(tp).get
+ assertEquals(replica2, partitionStateAfterFirstShutdown.leaderAndIsr.leader)
+ assertEquals(Set(replica2), partitionStateAfterFirstShutdown.leaderAndIsr.isr.toSet)
+
+ servers(replica2).shutdown()
+ servers(replica2).awaitShutdown()
+
+ val partitionStateAfterSecondShutdown = controller.controllerContext.partitionLeadershipInfo(tp).get
+ assertEquals(-1, partitionStateAfterSecondShutdown.leaderAndIsr.leader)
+ assertEquals(Set(replica2), partitionStateAfterSecondShutdown.leaderAndIsr.isr.toSet)
+
+ servers(replica1).startup()
+ TestUtils.waitUntilLeaderIsKnown(servers, tp)
+
+ val partitionStateAfterRestart = controller.controllerContext.partitionLeadershipInfo(tp).get
+ assertEquals(replica1, partitionStateAfterRestart.leaderAndIsr.leader)
+ assertEquals(Set(replica1), partitionStateAfterRestart.leaderAndIsr.isr.toSet)
+ assertEquals(LeaderRecoveryState.RECOVERING, partitionStateAfterRestart.leaderAndIsr.leaderRecoveryState)
+
+ val leaderId = replica1
+ val leaderBrokerEpoch = servers(replica1).kafkaController.brokerEpoch
+ val leaderEpoch = partitionStateAfterRestart.leaderAndIsr.leaderEpoch
+ val partitionEpoch = partitionStateAfterRestart.leaderAndIsr.partitionEpoch
+
+ def assertAlterPartition(
+ topLevelError: Errors = Errors.NONE,
+ partitionError: Errors = Errors.NONE,
+ leaderId: Int = leaderId,
+ brokerEpoch: Long = leaderBrokerEpoch,
+ leaderEpoch: Int = leaderEpoch,
+ partitionEpoch: Int = partitionEpoch,
+ leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+ ): Unit = {
+ assertAlterPartitionError(
+ topicPartition = tp,
+ leaderId = leaderId,
+ brokerEpoch = brokerEpoch,
+ leaderEpoch = leaderEpoch,
+ partitionEpoch = partitionEpoch,
+ isr = replicas.toSet,
+ leaderRecoveryState = leaderRecoveryState,
+ topLevelError = topLevelError,
+ partitionError = partitionError
+ )
}
- controller.eventManager.put(AlterPartitionReceived(brokerId, brokerEpoch, isrsToAlter, callback))
- future
+
+ assertAlterPartition(
+ topLevelError = Errors.STALE_BROKER_EPOCH,
+ brokerEpoch = leaderBrokerEpoch - 1
+ )
+
+ assertAlterPartition(
+ topLevelError = Errors.STALE_BROKER_EPOCH,
+ leaderId = 99
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_UPDATE_VERSION,
+ partitionEpoch = partitionEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch + 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_REQUEST,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING
+ )
+
+ // Version/epoch errors take precedence over other validations since
+ // the leader may be working with outdated state.
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_UPDATE_VERSION,
+ partitionEpoch = partitionEpoch - 1,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch - 1,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch + 1,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING
+ )
}
- def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = {
- val future = new CompletableFuture[Errors]()
- val controller = getController().kafkaController
+ def assertAlterPartitionError(
+ topicPartition: TopicPartition,
+ leaderId: Int,
+ brokerEpoch: Long,
+ leaderEpoch: Int,
+ partitionEpoch: Int,
+ isr: Set[Int],
+ leaderRecoveryState: LeaderRecoveryState,
+ topLevelError: Errors,
+ partitionError: Errors,
+ ): Unit = {
+ val leaderAndIsr = LeaderAndIsr(
+ leader = leaderId,
+ leaderEpoch = leaderEpoch,
+ isr = isr.toList,
+ partitionEpoch = partitionEpoch,
+ leaderRecoveryState = leaderRecoveryState
+ )
+
+ val future = captureAlterPartitionError(
+ brokerId = leaderId,
+ brokerEpoch = brokerEpoch,
+ topicPartition = topicPartition,
+ leaderAndIsr = leaderAndIsr
+ )
+
+ val errors = future.get(10, TimeUnit.SECONDS)
+ assertEquals(topLevelError, errors.topLevelError)
+
+ if (topLevelError == Errors.NONE) {
+ assertEquals(Some(partitionError), errors.partitionError)
+ }
+ }
+
+ private case class AlterPartitionError(topLevelError: Errors, partitionError: Option[Errors])
+
+ private def captureAlterPartitionError(
+ brokerId: Int,
+ brokerEpoch: Long,
+ topicPartition: TopicPartition,
+ leaderAndIsr: LeaderAndIsr
+ ): CompletableFuture[AlterPartitionError] = {
+ val future = new CompletableFuture[AlterPartitionError]()
val callback: AlterPartitionCallback = {
case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
- partitionResults.get(tp) match {
- case Some(Left(error: Errors)) => future.complete(error)
- case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result"))
- case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result"))
+ partitionResults.get(topicPartition) match {
+ case Some(Left(error: Errors)) =>
+ future.complete(AlterPartitionError(topLevelError = Errors.NONE, partitionError = Some(error)))
+
+ case Some(Right(_: LeaderAndIsr)) =>
+ future.complete(AlterPartitionError(topLevelError = Errors.NONE, partitionError = Some(Errors.NONE)))
+
+ case None =>
+ future.completeExceptionally(new AssertionError(s"Should have seen $topicPartition in result"))
}
- case Right(_: Errors) =>
- future.completeExceptionally(new AssertionError(s"Should not seen top-level error"))
+
+ case Right(error: Errors) =>
+ future.complete(AlterPartitionError(topLevelError = error, partitionError = None))
}
- controller.eventManager.put(AlterPartitionReceived(brokerId, brokerEpoch, isrsToAlter, callback))
+
+ val partitionsToAlter = Map(topicPartition -> leaderAndIsr)
+ val controller = getController().kafkaController
+ controller.eventManager.put(AlterPartitionReceived(brokerId, brokerEpoch, partitionsToAlter, callback))
future
}