You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2018/03/24 01:19:05 UTC

[kafka] branch trunk updated: KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 685fd03  KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion
685fd03 is described below

commit 685fd03dda02957a91a50fa16409400b1ea08f81
Author: Lucas Wang <lu...@linkedin.com>
AuthorDate: Fri Mar 23 18:18:16 2018 -0700

    KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion
    
    This patch adds logic in handling the PartitionModifications event, so that if the partition count is increased when a topic deletion is still in progress, the controller will restore the data of the path /brokers/topics/"topic" to remove the added partitions.
    
    Testing done:
    Added a new test method to cover the bug
    
    Author: Lucas Wang <lu...@linkedin.com>
    
    Reviewers: Jiangjie (Becket) Qin <be...@gmail.com>
    
    Closes #4666 from gitlw/prevent_increasing_partition_count_during_topic_deletion
---
 .../scala/kafka/controller/KafkaController.scala   | 21 +++++-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   | 83 +++++++++++++++++++++-
 2 files changed, 101 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ed2fb90..2cb3f7c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1302,14 +1302,31 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   case class PartitionModifications(topic: String) extends ControllerEvent {
     override def state: ControllerState = ControllerState.TopicChange
 
+    def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit = {
+      info("Restoring the partition replica assignment for topic %s".format(topic))
+
+      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
+      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment.filter(p =>
+        existingPartitions.contains(p._1.partition.toString))
+
+      zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment)
+    }
+
     override def process(): Unit = {
       if (!isActive) return
       val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
       val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
         !controllerContext.partitionReplicaAssignment.contains(p._1))
       if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
-        error(s"Skipping adding partitions ${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " +
-          "since it is currently being deleted")
+        if (partitionsToBeAdded.nonEmpty) {
+          warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
+            .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+
+          restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
+        } else {
+          // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
+          info("Ignoring partition change during topic deletion as no new partitions are added")
+        }
       else {
         if (partitionsToBeAdded.nonEmpty) {
           info(s"New partitions to be added $partitionsToBeAdded")
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 897cc59..ef455d4 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -24,7 +24,8 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import java.util.Properties
 
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaDeletionSuccessful}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
@@ -145,6 +146,86 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
   }
 
+  private def getController() : (KafkaServer, Int) = {
+    val controllerId = zkClient.getControllerId.getOrElse(fail("Controller doesn't exist"))
+    val controller = servers.find(s => s.config.brokerId == controllerId).get
+    (controller, controllerId)
+  }
+
+  private def ensureControllerExists() = {
+    TestUtils.waitUntilTrue(() => {
+      try {
+        getController()
+        true
+      } catch {
+        case _: Throwable  => false
+      }
+    }, "Controller should eventually exist")
+  }
+
+  private def getAllReplicasFromAssignment(topic : String, assignment : Map[Int, Seq[Int]]) : Set[PartitionAndReplica] = {
+    assignment.flatMap { case (partition, replicas) =>
+      replicas.map {r => new PartitionAndReplica(new TopicPartition(topic, partition), r)}
+    }.toSet
+  }
+
+  @Test
+  def testIncreasePartitiovnCountDuringDeleteTopic() {
+    val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+    val topic = "test"
+    val topicPartition = new TopicPartition(topic, 0)
+    val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+    // create brokers
+    val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    this.servers = allServers
+    val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
+    // create the topic
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    // wait until replica log is created on every broker
+    TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
+      "Replicas for topic test not created.")
+    // shutdown a broker to make sure the following topic deletion will be suspended
+    val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
+    assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+    follower.shutdown()
+    // start topic deletion
+    adminZkClient.deleteTopic(topic)
+
+    // make sure deletion of all of the topic's replicas have been tried
+    ensureControllerExists()
+    val (controller, controllerId) = getController()
+    val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment)
+    TestUtils.waitUntilTrue(() => {
+      val replicasInDeletionSuccessful = controller.kafkaController.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+      val offlineReplicas = controller.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
+      allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas)
+    }, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica")
+
+    // increase the partition count for topic
+    val topicCommandOptions = new TopicCommand.TopicCommandOptions(Array("--zookeeper", zkConnect, "--alter", "--topic", topic, "--partitions", "2"))
+    TopicCommand.alterTopic(zkClient, topicCommandOptions)
+
+    // trigger a controller switch now
+    val previousControllerId = controllerId
+
+    controller.shutdown()
+
+    ensureControllerExists()
+    // wait until a new controller to show up
+    TestUtils.waitUntilTrue(() => {
+      val (newController, newControllerId) = getController()
+      newControllerId != previousControllerId
+    }, "The new controller should not have the failed controller id")
+
+    // bring back the failed brokers
+    follower.startup()
+    controller.startup()
+    TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+  }
+
+
   @Test
   def testDeleteTopicDuringAddPartition() {
     val topic = "test"

-- 
To stop receiving notification emails like this one, please contact
jqin@apache.org.