You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2023/02/06 18:51:07 UTC

[kafka] branch 3.4 updated: KAFKA-13972; Ensure replica state deleted after reassignment cancellation (#13107)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 4064f4a7e43 KAFKA-13972; Ensure replica state deleted after reassignment cancellation (#13107)
4064f4a7e43 is described below

commit 4064f4a7e434b7096ee506d17413fc37258384d0
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Jan 18 10:26:48 2023 -0800

    KAFKA-13972; Ensure replica state deleted after reassignment cancellation (#13107)
    
    When a reassignment is cancelled, we need to delete the partition state of adding replicas. Failing to do so causes "stray" replicas which take up disk space and can cause topicId conflicts if the topic is later recreated. Currently, this logic does not work because the leader epoch does not always get bumped after cancellation. Without a leader epoch bump, the replica will ignore StopReplica requests sent by the controller and the replica may remain online.
    
    In this patch, we fix the problem by loosening the epoch check on the broker side when a StopReplica request is received. Instead of ignoring the request when the request epoch matches the current epoch, the request will be accepted.
    
    Note, this problem only affects the ZK controller. The integration tests added here nevertheless cover both metadata modes.
    
    Reviewers:  David Jacot <dj...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../admin/ReassignPartitionsIntegrationTest.scala  | 95 ++++++++++++++++++----
 .../unit/kafka/server/ReplicaManagerTest.scala     |  4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 21 +++--
 4 files changed, 94 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b2a37479bae..069ef27d67b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -387,7 +387,7 @@ class ReplicaManager(val config: KafkaConfig,
               // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-                  requestLeaderEpoch > currentLeaderEpoch) {
+                  requestLeaderEpoch >= currentLeaderEpoch) {
                 stoppedPartitions += topicPartition -> deletePartition
                 // Assume that everything will go right. It is overwritten in case of an error.
                 responseMap.put(topicPartition, Errors.NONE)
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 9b3b935f23e..d3a04da4d13 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -108,12 +108,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
       """{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" +
       """]}"""
 
+    val foo0 = new TopicPartition("foo", 0)
+    val bar0 = new TopicPartition("bar", 0)
+
     // Check that the assignment has not yet been started yet.
     val initialAssignment = Map(
-      new TopicPartition("foo", 0) ->
-        PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
-      new TopicPartition("bar", 0) ->
-        PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
+      foo0 -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
+      bar0 -> PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
     )
     waitForVerifyAssignment(cluster.adminClient, assignment, false,
       VerifyAssignmentResult(initialAssignment))
@@ -122,10 +123,8 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
     runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
     assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
     val finalAssignment = Map(
-      new TopicPartition("foo", 0) ->
-        PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
-      new TopicPartition("bar", 0) ->
-        PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
+      foo0 -> PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
+      bar0 -> PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
     )
 
     val verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, false)
@@ -137,6 +136,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
 
     assertEquals(unthrottledBrokerConfigs,
       describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
+
+    // Verify that partitions are removed from brokers no longer assigned
+    verifyReplicaDeleted(topicPartition = foo0, replicaId = 2)
+    verifyReplicaDeleted(topicPartition = bar0, replicaId = 1)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -296,10 +299,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testCancellation(quorum: String): Unit = {
+    val foo0 = new TopicPartition("foo", 0)
+    val baz1 = new TopicPartition("baz", 1)
+
     cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
-    cluster.produceMessages("foo", 0, 200)
-    cluster.produceMessages("baz", 1, 200)
+    cluster.produceMessages(foo0.topic, foo0.partition, 200)
+    cluster.produceMessages(baz1.topic, baz1.partition, 200)
     val assignment = """{"version":1,"partitions":""" +
       """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
       """{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}""" +
@@ -314,14 +320,11 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
     // from completing before this runs.
     waitForVerifyAssignment(cluster.adminClient, assignment, true,
       VerifyAssignmentResult(Map(
-        new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
-        new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
+        foo0 -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
+        baz1 -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
         true, Map(), false))
     // Cancel the reassignment.
-    assertEquals((Set(
-      new TopicPartition("foo", 0),
-      new TopicPartition("baz", 1)
-    ), Set()), runCancelAssignment(cluster.adminClient, assignment, true))
+    assertEquals((Set(foo0, baz1), Set()), runCancelAssignment(cluster.adminClient, assignment, true))
     // Broker throttles are still active because we passed --preserve-throttles
     waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
     // Cancelling the reassignment again should reveal nothing to cancel.
@@ -330,6 +333,62 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
     waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
     // Verify that there are no ongoing reassignments.
     assertFalse(runVerifyAssignment(cluster.adminClient, assignment, false).partsOngoing)
+    // Verify that the partition is removed from cancelled replicas
+    verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
+    verifyReplicaDeleted(topicPartition = baz1, replicaId = 3)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCancellationWithAddingReplicaInIsr(quorum: String): Unit = {
+    val foo0 = new TopicPartition("foo", 0)
+
+    cluster = new ReassignPartitionsTestCluster()
+    cluster.setup()
+    cluster.produceMessages(foo0.topic, foo0.partition, 200)
+
+    // The reassignment will bring replicas 3 and 4 into the replica set and remove 1 and 2.
+    val assignment = """{"version":1,"partitions":""" +
+      """[{"topic":"foo","partition":0,"replicas":[0,3,4],"log_dirs":["any","any","any"]}""" +
+      """]}"""
+
+    // We will throttle replica 4 so that only replica 3 joins the ISR
+    TestUtils.setReplicationThrottleForPartitions(
+      cluster.adminClient,
+      brokerIds = Seq(4),
+      partitions = Set(foo0),
+      throttleBytes = 1
+    )
+
+    // Execute the assignment and wait for replica 3 (only) to join the ISR
+    runExecuteAssignment(
+      cluster.adminClient,
+      additional = false,
+      reassignmentJson = assignment
+    )
+    TestUtils.waitUntilTrue(
+      () => TestUtils.currentIsr(cluster.adminClient, foo0) == Set(0, 1, 2, 3),
+      msg = "Timed out while waiting for replica 3 to join the ISR"
+    )
+
+    // Now cancel the assignment and verify that the partition is removed from cancelled replicas
+    assertEquals((Set(foo0), Set()), runCancelAssignment(cluster.adminClient, assignment, preserveThrottles = true))
+    verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
+    verifyReplicaDeleted(topicPartition = foo0, replicaId = 4)
+  }
+
+  private def verifyReplicaDeleted(
+    topicPartition: TopicPartition,
+    replicaId: Int
+  ): Unit = {
+    def isReplicaStoppedAndDeleted(): Boolean = {
+      val server = cluster.servers(replicaId)
+      val partition = server.replicaManager.getPartition(topicPartition)
+      val log = server.logManager.getLog(topicPartition)
+      partition == HostedPartition.None && log.isEmpty
+    }
+    TestUtils.waitUntilTrue(isReplicaStoppedAndDeleted,
+      msg = s"Timed out waiting for replica $replicaId of $topicPartition to be deleted")
   }
 
   private def waitForLogDirThrottle(throttledBrokers: Set[Int], logDirThrottle: Long): Unit = {
@@ -541,8 +600,8 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
   private def runExecuteAssignment(adminClient: Admin,
                                    additional: Boolean,
                                    reassignmentJson: String,
-                                   interBrokerThrottle: Long,
-                                   replicaAlterLogDirsThrottle: Long) = {
+                                   interBrokerThrottle: Long = -1,
+                                   replicaAlterLogDirsThrottle: Long = -1) = {
     println(s"==> executeAssignment(adminClient, additional=${additional}, " +
       s"reassignmentJson=${reassignmentJson}, " +
       s"interBrokerThrottle=${interBrokerThrottle}, " +
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e623816c397..4cdc92881ca 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2707,7 +2707,7 @@ class ReplicaManagerTest {
 
   @Test
   def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(): Unit = {
-    testStopReplicaWithExistingPartition(1, false, false, Errors.FENCED_LEADER_EPOCH)
+    testStopReplicaWithExistingPartition(1, false, false, Errors.NONE)
   }
 
   @Test
@@ -2737,7 +2737,7 @@ class ReplicaManagerTest {
 
   @Test
   def testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(): Unit = {
-    testStopReplicaWithExistingPartition(1, true, false, Errors.FENCED_LEADER_EPOCH)
+    testStopReplicaWithExistingPartition(1, true, false, Errors.NONE)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7110237ce4c..e21214b6f7e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1964,16 +1964,23 @@ object TestUtils extends Logging {
     )
   }
 
+  def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
+    val description = admin.describeTopics(Set(partition.topic).asJava)
+      .allTopicNames
+      .get
+      .asScala
+
+    description
+      .values
+      .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
+      .map(_.id)
+      .toSet
+  }
+
   def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: Set[Int]): Unit = {
     waitUntilTrue(
       () => {
-        val description = client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala
-        val isr = description
-          .values
-          .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
-          .map(_.id)
-          .toSet
-
+        val isr = currentIsr(client, partition)
         brokerIds.subsetOf(isr)
       },
       s"Expected brokers $brokerIds to be in the ISR for $partition"