You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/12 20:02:20 UTC
[kafka] 02/02: KAFKA-9499;
Improve deletion process by batching more aggressively (#8053)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 8e6d24a50fd81612b7442c8728bb29b4b6b19418
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Feb 12 20:46:54 2020 +0100
KAFKA-9499; Improve deletion process by batching more aggressively (#8053)
This PR speeds up the deletion process by doing the following:
- Batch whenever possible to minimize the number of requests sent out to other brokers;
- Refactor `onPartitionDeletion` to remove the usage of `allLiveReplicas`.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/controller/ControllerContext.scala | 10 +-
.../kafka/controller/TopicDeletionManager.scala | 112 +++++++++++----------
.../kafka/controller/ControllerContextTest.scala | 15 ++-
.../controller/MockPartitionStateMachine.scala | 12 +++
.../kafka/controller/MockReplicaStateMachine.scala | 12 +++
.../controller/TopicDeletionManagerTest.scala | 28 +++++-
6 files changed, 114 insertions(+), 75 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 47cb553..f7a6cdd 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -221,7 +221,9 @@ class ControllerContext {
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
partitionAssignments.getOrElse(topic, mutable.Map.empty).flatMap {
- case (partition, assignment) => assignment.replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r))
+ case (partition, assignment) => assignment.replicas.map { r =>
+ PartitionAndReplica(new TopicPartition(topic, partition), r)
+ }
}.toSet
}
@@ -231,12 +233,6 @@ class ControllerContext {
}.toSet
}
- def allLiveReplicas(): Set[PartitionAndReplica] = {
- replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
- isReplicaOnline(partitionAndReplica.replica, partitionAndReplica.topicPartition)
- }
- }
-
/**
* Get all online and offline replicas.
*
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index d032b3b..64f9ff0 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import scala.collection.Set
+import scala.collection.mutable
trait DeletionClient {
def deleteTopic(topic: String, epochZkVersion: Int): Unit
@@ -226,12 +227,12 @@ class TopicDeletionManager(config: KafkaConfig,
/**
* If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
* To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state
- *@param topic Topic for which deletion should be retried
+ * @param topics Topics for which deletion should be retried
*/
- private def retryDeletionForIneligibleReplicas(topic: String): Unit = {
+ private def retryDeletionForIneligibleReplicas(topics: Set[String]): Unit = {
// reset replica states from ReplicaDeletionIneligible to OfflineReplica
- val failedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionIneligible)
- info(s"Retrying deletion of topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
+ val failedReplicas = topics.flatMap(controllerContext.replicasInState(_, ReplicaDeletionIneligible))
+ debug(s"Retrying deletion of topics ${topics.mkString(",")} since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
}
@@ -256,9 +257,6 @@ class TopicDeletionManager(config: KafkaConfig,
* removed from their caches.
*/
private def onTopicDeletion(topics: Set[String]): Unit = {
- info(s"Topic deletion callback for ${topics.mkString(",")}")
- // send update metadata so that brokers stop serving data for topics to be deleted
- val partitions = topics.flatMap(controllerContext.partitionsForTopic)
val unseenTopicsForDeletion = topics -- controllerContext.topicsWithDeletionStarted
if (unseenTopicsForDeletion.nonEmpty) {
val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
@@ -269,66 +267,61 @@ class TopicDeletionManager(config: KafkaConfig,
controllerContext.beginTopicDeletion(unseenTopicsForDeletion)
}
- client.sendMetadataUpdate(partitions)
- topics.foreach { topic =>
- onPartitionDeletion(controllerContext.partitionsForTopic(topic))
- }
- }
+ // send update metadata so that brokers stop serving data for topics to be deleted
+ client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic))
- /**
- * Invoked by onPartitionDeletion. It is the 2nd step of topic deletion, the first being sending
- * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion,
- * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
- * is never retried. A topic is removed from the in progress list when
- * 1. Either the topic is successfully deleted OR
- * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state
- * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
- * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
- * the replicas a StopReplicaRequest (delete=true)
- * This method does the following things -
- * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
- * for deletion if some replicas are dead since it won't complete successfully anyway
- * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
- * @param replicasForTopicsToBeDeleted
- */
- private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]): Unit = {
- replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
- val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
- val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
- val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
- val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
- // move dead replicas directly to failed state
- replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible)
- // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
- replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
- debug(s"Deletion started for replicas ${replicasForDeletionRetry.mkString(",")}")
- replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted)
- if (deadReplicasForTopic.nonEmpty) {
- debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found for topic $topic")
- markTopicIneligibleForDeletion(Set(topic), reason = "offline replicas")
- }
- }
+ onPartitionDeletion(topics)
}
/**
* Invoked by onTopicDeletion with the list of partitions for topics to be deleted
* It does the following -
- * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being
- * deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
+ * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
+ * for deletion if some replicas are dead since it won't complete successfully anyway
* 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
* and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
* it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
* 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
* will delete all persistent data from all replicas of the respective partitions
*/
- private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]): Unit = {
- info(s"Partition deletion callback for ${partitionsToBeDeleted.mkString(",")}")
- val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
- startReplicaDeletion(replicasPerPartition)
+ private def onPartitionDeletion(topicsToBeDeleted: Set[String]): Unit = {
+ val allDeadReplicas = mutable.ListBuffer.empty[PartitionAndReplica]
+ val allReplicasForDeletionRetry = mutable.ListBuffer.empty[PartitionAndReplica]
+ val allTopicsIneligibleForDeletion = mutable.Set.empty[String]
+
+ topicsToBeDeleted.foreach { topic =>
+ val (aliveReplicas, deadReplicas) = controllerContext.replicasForTopic(topic).partition { r =>
+ controllerContext.isReplicaOnline(r.replica, r.topicPartition)
+ }
+
+ val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
+ val replicasForDeletionRetry = aliveReplicas -- successfullyDeletedReplicas
+
+ allDeadReplicas ++= deadReplicas
+ allReplicasForDeletionRetry ++= replicasForDeletionRetry
+
+ if (deadReplicas.nonEmpty) {
+ debug(s"Dead Replicas (${deadReplicas.mkString(",")}) found for topic $topic")
+ allTopicsIneligibleForDeletion += topic
+ }
+ }
+
+ // move dead replicas directly to failed state
+ replicaStateMachine.handleStateChanges(allDeadReplicas, ReplicaDeletionIneligible)
+ // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
+ replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, OfflineReplica)
+ replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, ReplicaDeletionStarted)
+
+ if (allTopicsIneligibleForDeletion.nonEmpty) {
+ markTopicIneligibleForDeletion(allTopicsIneligibleForDeletion, reason = "offline replicas")
+ }
}
private def resumeDeletions(): Unit = {
val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
+ val topicsEligibleForRetry = mutable.Set.empty[String]
+ val topicsEligibleForDeletion = mutable.Set.empty[String]
+
if (topicsQueuedForDeletion.nonEmpty)
info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
@@ -343,16 +336,25 @@ class TopicDeletionManager(config: KafkaConfig,
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
- retryDeletionForIneligibleReplicas(topic)
+ topicsEligibleForRetry += topic
}
}
- // Try delete topic if it is eligible for deletion.
+ // Add topic to the eligible set if it is eligible for deletion.
if (isTopicEligibleForDeletion(topic)) {
info(s"Deletion of topic $topic (re)started")
- // topic deletion will be kicked off
- onTopicDeletion(Set(topic))
+ topicsEligibleForDeletion += topic
}
}
+
+ // topic deletion retry will be kicked off
+ if (topicsEligibleForRetry.nonEmpty) {
+ retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
+ }
+
+ // topic deletion will be kicked off
+ if (topicsEligibleForDeletion.nonEmpty) {
+ onTopicDeletion(topicsEligibleForDeletion)
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index fd8d3e7..39023fa 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -18,6 +18,7 @@
package unit.kafka.controller
import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.PartitionAndReplica
import kafka.controller.{ControllerContext, ReplicaAssignment}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
@@ -50,14 +51,12 @@ class ControllerContextTest {
// Simple round-robin replica assignment
var leaderIndex = 0
- Seq(tp1, tp2, tp3).foreach {
- partition =>
- val replicas = brokers.indices.map { i =>
- val replica = brokers((i + leaderIndex) % brokers.size)
- replica
- }
- context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
- leaderIndex += 1
+ Seq(tp1, tp2, tp3).foreach { partition =>
+ val replicas = brokers.indices.map { i =>
+ brokers((i + leaderIndex) % brokers.size)
+ }
+ context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
+ leaderIndex += 1
}
}
diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
index 0c6c00d..b29a3d9 100644
--- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -27,11 +27,23 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
uncleanLeaderElectionEnabled: Boolean)
extends PartitionStateMachine(controllerContext) {
+ var stateChangesByTargetState = mutable.Map.empty[PartitionState, Int].withDefaultValue(0)
+
+ def stateChangesCalls(targetState: PartitionState): Int = {
+ stateChangesByTargetState(targetState)
+ }
+
+ def clear(): Unit = {
+ stateChangesByTargetState.clear()
+ }
+
override def handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
+ stateChangesByTargetState(targetState) = stateChangesByTargetState(targetState) + 1
+
partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
if (invalidPartitions.nonEmpty) {
diff --git a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
index e5207bf..32bfc50 100644
--- a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
@@ -17,10 +17,22 @@
package kafka.controller
import scala.collection.Seq
+import scala.collection.mutable
class MockReplicaStateMachine(controllerContext: ControllerContext) extends ReplicaStateMachine(controllerContext) {
+ val stateChangesByTargetState = mutable.Map.empty[ReplicaState, Int].withDefaultValue(0)
+
+ def stateChangesCalls(targetState: ReplicaState): Int = {
+ stateChangesByTargetState(targetState)
+ }
+
+ def clear(): Unit = {
+ stateChangesByTargetState.clear()
+ }
override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
+ stateChangesByTargetState(targetState) = stateChangesByTargetState(targetState) + 1
+
replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
if (invalidReplicas.nonEmpty) {
diff --git a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
index 33479c1..b1b8c24 100644
--- a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
@@ -76,25 +76,43 @@ class TopicDeletionManagerTest {
val fooPartitions = controllerContext.partitionsForTopic("foo")
val fooReplicas = controllerContext.replicasForPartition(fooPartitions).toSet
+ val barPartitions = controllerContext.partitionsForTopic("bar")
+ val barReplicas = controllerContext.replicasForPartition(barPartitions).toSet
+
+ // Clean up state changes before starting the deletion
+ replicaStateMachine.clear()
+ partitionStateMachine.clear()
// Queue the topic for deletion
- deletionManager.enqueueTopicsForDeletion(Set("foo"))
+ deletionManager.enqueueTopicsForDeletion(Set("foo", "bar"))
assertEquals(fooPartitions, controllerContext.partitionsInState("foo", NonExistentPartition))
assertEquals(fooReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
- verify(deletionClient).sendMetadataUpdate(fooPartitions)
- assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
- assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+ assertEquals(barPartitions, controllerContext.partitionsInState("bar", NonExistentPartition))
+ assertEquals(barReplicas, controllerContext.replicasInState("bar", ReplicaDeletionStarted))
+ verify(deletionClient).sendMetadataUpdate(fooPartitions ++ barPartitions)
+ assertEquals(Set("foo", "bar"), controllerContext.topicsToBeDeleted)
+ assertEquals(Set("foo", "bar"), controllerContext.topicsWithDeletionStarted)
assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
// Complete the deletion
- deletionManager.completeReplicaDeletion(fooReplicas)
+ deletionManager.completeReplicaDeletion(fooReplicas ++ barReplicas)
assertEquals(Set.empty, controllerContext.partitionsForTopic("foo"))
assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "foo"))
+ assertEquals(Set.empty, controllerContext.partitionsForTopic("bar"))
+ assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "bar"))
assertEquals(Set(), controllerContext.topicsToBeDeleted)
assertEquals(Set(), controllerContext.topicsWithDeletionStarted)
assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+
+ assertEquals(1, partitionStateMachine.stateChangesCalls(OfflinePartition))
+ assertEquals(1, partitionStateMachine.stateChangesCalls(NonExistentPartition))
+
+ assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionIneligible))
+ assertEquals(1, replicaStateMachine.stateChangesCalls(OfflineReplica))
+ assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionStarted))
+ assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionSuccessful))
}
@Test