You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/09 15:55:49 UTC

[kafka] branch trunk updated: KAFKA-14144:; Compare AlterPartition LeaderAndIsr before fencing partition epoch (#12489)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62c2880a584 KAFKA-14144:; Compare AlterPartition LeaderAndIsr before fencing partition epoch (#12489)
62c2880a584 is described below

commit 62c2880a584dee52af52d74d7f0ec44475528c72
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Tue Aug 9 08:55:38 2022 -0700

    KAFKA-14144:; Compare AlterPartition LeaderAndIsr before fencing partition epoch (#12489)
    
    This PR fixes an AlterPartition regression introduced in https://github.com/apache/kafka/pull/12032
    
    When an AlterPartition request succeeds, the partition epoch gets bumped. In Zk controller mode the sender also relies on the AlterPartition response to be informed of the new partition epoch.
    If the sender times out the request before a response is sent, the sender will have a stale partition epoch compared to the ZK controller state and will be fenced on subsequent AlterPartition request attempts. The sender will not receive an updated partition epoch until it receives a LeaderAndIsr request for controller-initiated ISR changes.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/api/LeaderAndIsr.scala   |  5 +-
 .../scala/kafka/controller/KafkaController.scala   | 11 +--
 .../controller/ControllerIntegrationTest.scala     | 82 +++++++++++++---------
 3 files changed, 58 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index dd1d381a144..da68cdb479c 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -62,7 +62,7 @@ case class LeaderAndIsr(
     if (leader == LeaderAndIsr.NoLeader) None else Some(leader)
   }
 
-  def equalsIgnorePartitionEpoch(other: LeaderAndIsr): Boolean = {
+  def equalsAllowStalePartitionEpoch(other: LeaderAndIsr): Boolean = {
     if (this == other) {
       true
     } else if (other == null) {
@@ -71,7 +71,8 @@ case class LeaderAndIsr(
       leader == other.leader &&
         leaderEpoch == other.leaderEpoch &&
         isr.equals(other.isr) &&
-        leaderRecoveryState == other.leaderRecoveryState
+        leaderRecoveryState == other.leaderRecoveryState &&
+        partitionEpoch <= other.partitionEpoch
     }
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 8d16eb7e1da..d179bcd6ca4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2339,14 +2339,15 @@ class KafkaController(val config: KafkaConfig,
           if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
             partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
             None
-          } else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) {
-            partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
-            None
-          } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+          } else if (newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) {
             // If a partition is already in the desired state, just return it
+            // this check must be done before fencing based on partition epoch to maintain idempotency
             partitionResponses(tp) = Right(currentLeaderAndIsr)
             None
-          } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+          } else if (newLeaderAndIsr.partitionEpoch != currentLeaderAndIsr.partitionEpoch) {
+            partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+            None
+          }  else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
             partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
             info(
               s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " +
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 57cbeafd4d0..76188894371 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1002,47 +1002,58 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
     val controller = getController().kafkaController
     val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
-    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val oldLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val newIsr = List(oldLeaderAndIsr.leader)
+    val newPartitionEpoch = oldLeaderAndIsr.partitionEpoch + 1
     val topicId = controller.controllerContext.topicIds(tp.topic)
     val brokerId = otherBroker.config.brokerId
     val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId)
 
-    // When re-sending the current ISR, we should not get and error or any ISR changes
-    val alterPartitionRequest = new AlterPartitionRequestData()
-      .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpoch)
-      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
-        .setTopicId(topicId)
-        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(tp.partition)
-          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
-          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
-          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+    def sendAndVerifyAlterPartitionResponse(requestPartitionEpoch: Int): Unit = {
+      val alterPartitionRequest = new AlterPartitionRequestData()
+        .setBrokerId(brokerId)
+        .setBrokerEpoch(brokerEpoch)
+        .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+          .setTopicId(topicId)
+          .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+            .setPartitionEpoch(requestPartitionEpoch)
+            .setNewIsr(newIsr.map(Int.box).asJava)
+            .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+          ).asJava)
         ).asJava)
-      ).asJava)
 
-    val future = new CompletableFuture[AlterPartitionResponseData]()
-    controller.eventManager.put(AlterPartitionReceived(
-      alterPartitionRequest,
-      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
-      future.complete
-    ))
+      val future = new CompletableFuture[AlterPartitionResponseData]()
+      controller.eventManager.put(AlterPartitionReceived(
+        alterPartitionRequest,
+        AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+        future.complete
+      ))
 
-    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
-      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
-        .setTopicId(topicId)
-        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
-          .setPartitionIndex(tp.partition)
-          .setLeaderId(brokerId)
-          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
-          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
-          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+      // When re-sending an ISR update, we should not get and error or any ISR changes
+      val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+        .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+          .setTopicId(topicId)
+          .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setLeaderId(brokerId)
+            .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+            .setPartitionEpoch(newPartitionEpoch)
+            .setIsr(newIsr.map(Int.box).asJava)
+            .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+          ).asJava)
         ).asJava)
-      ).asJava)
+      assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
+    }
 
-    assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
+    // send a request, expect the partition epoch to be incremented
+    sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+
+    // re-send the same request with various partition epochs (less/equal/greater than the current
+    // epoch), expect it to succeed while the partition epoch remains the same
+    sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+    sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
   }
 
   @Test
@@ -1100,7 +1111,6 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
     assertAlterPartition(
       partitionError = Errors.UNKNOWN_TOPIC_ID,
-      topicPartition = tp,
       topicIdOpt = Some(Uuid.randomUuid())
     )
 
@@ -1118,9 +1128,15 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
     assertAlterPartition(
       partitionError = Errors.INVALID_UPDATE_VERSION,
+      isr = Set(leaderId),
       partitionEpoch = partitionEpoch - 1
     )
 
+    assertAlterPartition(
+      partitionError = Errors.INVALID_UPDATE_VERSION,
+      partitionEpoch = partitionEpoch + 1
+    )
+
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderEpoch = leaderEpoch - 1