You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2018/03/24 01:19:05 UTC
[kafka] branch trunk updated: KAFKA-6612: Added logic to prevent
increasing partition counts during topic deletion
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 685fd03 KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion
685fd03 is described below
commit 685fd03dda02957a91a50fa16409400b1ea08f81
Author: Lucas Wang <lu...@linkedin.com>
AuthorDate: Fri Mar 23 18:18:16 2018 -0700
KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion
This patch adds logic in handling the PartitionModifications event, so that if the partition count is increased when a topic deletion is still in progress, the controller will restore the data of the path /brokers/topics/"topic" to remove the added partitions.
Testing done:
Added a new test method to cover the bug
Author: Lucas Wang <lu...@linkedin.com>
Reviewers: Jiangjie (Becket) Qin <be...@gmail.com>
Closes #4666 from gitlw/prevent_increasing_partition_count_during_topic_deletion
---
.../scala/kafka/controller/KafkaController.scala | 21 +++++-
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 83 +++++++++++++++++++++-
2 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ed2fb90..2cb3f7c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1302,14 +1302,31 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
case class PartitionModifications(topic: String) extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicChange
+ def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit = {
+ info("Restoring the partition replica assignment for topic %s".format(topic))
+
+ val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
+ val existingPartitionReplicaAssignment = newPartitionReplicaAssignment.filter(p =>
+ existingPartitions.contains(p._1.partition.toString))
+
+ zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment)
+ }
+
override def process(): Unit = {
if (!isActive) return
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
- error(s"Skipping adding partitions ${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " +
- "since it is currently being deleted")
+ if (partitionsToBeAdded.nonEmpty) {
+ warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
+ .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+
+ restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
+ } else {
+ // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
+ info("Ignoring partition change during topic deletion as no new partitions are added")
+ }
else {
if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 897cc59..ef455d4 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -24,7 +24,8 @@ import org.junit.Assert._
import org.junit.{After, Test}
import java.util.Properties
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaDeletionSuccessful}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
@@ -145,6 +146,86 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
}
+ private def getController() : (KafkaServer, Int) = {
+ val controllerId = zkClient.getControllerId.getOrElse(fail("Controller doesn't exist"))
+ val controller = servers.find(s => s.config.brokerId == controllerId).get
+ (controller, controllerId)
+ }
+
+ private def ensureControllerExists() = {
+ TestUtils.waitUntilTrue(() => {
+ try {
+ getController()
+ true
+ } catch {
+ case _: Throwable => false
+ }
+ }, "Controller should eventually exist")
+ }
+
+ private def getAllReplicasFromAssignment(topic : String, assignment : Map[Int, Seq[Int]]) : Set[PartitionAndReplica] = {
+ assignment.flatMap { case (partition, replicas) =>
+ replicas.map {r => new PartitionAndReplica(new TopicPartition(topic, partition), r)}
+ }.toSet
+ }
+
+ @Test
+ def testIncreasePartitiovnCountDuringDeleteTopic() {
+ val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+ val topic = "test"
+ val topicPartition = new TopicPartition(topic, 0)
+ val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+ brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+ // create brokers
+ val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+ this.servers = allServers
+ val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
+ // create the topic
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+ // wait until replica log is created on every broker
+ TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
+ "Replicas for topic test not created.")
+ // shutdown a broker to make sure the following topic deletion will be suspended
+ val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
+ assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+ val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+ follower.shutdown()
+ // start topic deletion
+ adminZkClient.deleteTopic(topic)
+
+ // make sure deletion of all of the topic's replicas have been tried
+ ensureControllerExists()
+ val (controller, controllerId) = getController()
+ val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment)
+ TestUtils.waitUntilTrue(() => {
+ val replicasInDeletionSuccessful = controller.kafkaController.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+ val offlineReplicas = controller.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
+ allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas)
+ }, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica")
+
+ // increase the partition count for topic
+ val topicCommandOptions = new TopicCommand.TopicCommandOptions(Array("--zookeeper", zkConnect, "--alter", "--topic", topic, "--partitions", "2"))
+ TopicCommand.alterTopic(zkClient, topicCommandOptions)
+
+ // trigger a controller switch now
+ val previousControllerId = controllerId
+
+ controller.shutdown()
+
+ ensureControllerExists()
+ // wait until a new controller to show up
+ TestUtils.waitUntilTrue(() => {
+ val (newController, newControllerId) = getController()
+ newControllerId != previousControllerId
+ }, "The new controller should not have the failed controller id")
+
+ // bring back the failed brokers
+ follower.startup()
+ controller.startup()
+ TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+ }
+
+
@Test
def testDeleteTopicDuringAddPartition() {
val topic = "test"
--
To stop receiving notification emails like this one, please contact
jqin@apache.org.