You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/07/14 00:49:50 UTC

[kafka] branch trunk updated: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted (#8672)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43d43e6  KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted (#8672)
43d43e6 is described below

commit 43d43e6c7bbfbc87d0288f7b934d5b6e0ebf1913
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Jul 14 02:49:13 2020 +0200

    KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted (#8672)
    
    Update checkpoint files once for all deleted partitions instead of updating them for each deleted partitions. With this, a stop replica requests with 2000 partitions to be deleted takes ~2 secs instead of ~40 secs previously.
    
    Refactor the checkpointing methods to not compute the logsByDir all the time. It is now reused as much as possible.
    
    Refactor the exception handling. Some checkpointing methods were handling IOException but the underlying write process already catches them and throws KafkaStorageException instead.
    
    Reduce the logging in the log cleaner manager. It does not log anymore when a partition is deleted as it is not a useful information.
    
    Reviewers:  Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   7 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     |   2 +-
 .../main/scala/kafka/log/LogCleanerManager.scala   |   7 +-
 core/src/main/scala/kafka/log/LogManager.scala     | 228 ++++++++++++++-------
 .../main/scala/kafka/server/ReplicaManager.scala   |  82 ++++----
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   3 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  42 +++-
 7 files changed, 247 insertions(+), 124 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index eb03a57..f9a9de2 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -460,6 +460,10 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  /**
+   * Delete the partition. Note that deleting the partition does not delete the underlying logs.
+   * The logs are deleted by the ReplicaManager after having deleted the partition.
+   */
   def delete(): Unit = {
     // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
     inWriteLock(leaderIsrUpdateLock) {
@@ -471,9 +475,6 @@ class Partition(val topicPartition: TopicPartition,
       leaderReplicaIdOpt = None
       leaderEpochStartOffsetOpt = None
       Partition.removeMetrics(topicPartition)
-      logManager.asyncDelete(topicPartition)
-      if (logManager.getLog(topicPartition, isFuture = true).isDefined)
-        logManager.asyncDelete(topicPartition, isFuture = true)
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c83cc6f..c089626 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -894,7 +894,7 @@ private[log] class Cleaner(val id: Int,
     // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
     var full = false
-    for ( (segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full) {
+    for ((segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full) {
       checkDone(log.topicPartition)
 
       full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, nextSegmentStartOffset, log.config.maxMessageSize,
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 473f1fb..056c597 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -258,7 +258,6 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       abortAndPauseCleaning(topicPartition)
       resumeCleaning(Seq(topicPartition))
     }
-    info(s"The cleaning for partition $topicPartition is aborted")
   }
 
   /**
@@ -285,11 +284,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         case Some(s) =>
           throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.")
       }
-
       while(!isCleaningInStatePaused(topicPartition))
         pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
     }
-    info(s"The cleaning for partition $topicPartition is aborted and paused")
   }
 
   /**
@@ -357,12 +354,12 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
-          val existing = checkpoint.read().filter { case (k, _) => logs.keys.contains(k) } ++ update
+          val existing = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) } ++ update
           checkpoint.write(existing)
         } catch {
           case e: KafkaStorageException =>
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 0e137a0..e2966b0 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -464,9 +464,9 @@ class LogManager(logDirs: Seq[File],
       val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
       threadPools.append(pool)
 
-      val logsInDir = localLogsByDir.getOrElse(dir.toString, Map()).values
+      val logs = logsInDir(localLogsByDir, dir).values
 
-      val jobsForDir = logsInDir.map { log =>
+      val jobsForDir = logs.map { log =>
         val runnable: Runnable = () => {
           // flush the log to ensure latest possible recovery point
           log.flush()
@@ -482,12 +482,14 @@ class LogManager(logDirs: Seq[File],
       for ((dir, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
 
+        val logs = logsInDir(localLogsByDir, dir)
+
         // update the last flush point
         debug(s"Updating recovery points at $dir")
-        checkpointRecoveryOffsetsAndCleanSnapshot(dir, localLogsByDir.getOrElse(dir.toString, Map()).values.toSeq)
+        checkpointRecoveryOffsetsAndCleanSnapshotsInDir(dir, logs, logs.values.toSeq)
 
         debug(s"Updating log start offsets at $dir")
-        checkpointLogStartOffsetsInDir(dir)
+        checkpointLogStartOffsetsInDir(dir, logs)
 
         // mark that the shutdown was clean by creating marker file
         debug(s"Writing clean shutdown marker at $dir")
@@ -523,26 +525,24 @@ class LogManager(logDirs: Seq[File],
       }
       // If the log does not exist, skip it
       if (log != null) {
-        //May need to abort and pause the cleaning of the log, and resume after truncation is done.
-        val needToStopCleaner = cleaner != null && truncateOffset < log.activeSegment.baseOffset
+        // May need to abort and pause the cleaning of the log, and resume after truncation is done.
+        val needToStopCleaner = truncateOffset < log.activeSegment.baseOffset
         if (needToStopCleaner && !isFuture)
-          cleaner.abortAndPauseCleaning(topicPartition)
+          abortAndPauseCleaning(topicPartition)
         try {
           if (log.truncateTo(truncateOffset))
             affectedLogs += log
           if (needToStopCleaner && !isFuture)
-            cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
+            maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log, topicPartition)
         } finally {
-          if (needToStopCleaner && !isFuture) {
-            cleaner.resumeCleaning(Seq(topicPartition))
-            info(s"Cleaning for partition $topicPartition is resumed")
-          }
+          if (needToStopCleaner && !isFuture)
+            resumeCleaning(topicPartition)
         }
       }
     }
 
     for ((dir, logs) <- affectedLogs.groupBy(_.parentDirFile)) {
-      checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs)
+      checkpointRecoveryOffsetsAndCleanSnapshotsInDir(dir, logs)
     }
   }
 
@@ -562,21 +562,18 @@ class LogManager(logDirs: Seq[File],
     }
     // If the log does not exist, skip it
     if (log != null) {
-        //Abort and pause the cleaning of the log, and resume after truncation is done.
-      if (cleaner != null && !isFuture)
-        cleaner.abortAndPauseCleaning(topicPartition)
+      // Abort and pause the cleaning of the log, and resume after truncation is done.
+      if (!isFuture)
+        abortAndPauseCleaning(topicPartition)
       try {
         log.truncateFullyAndStartAt(newOffset)
-        if (cleaner != null && !isFuture) {
-          cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
-        }
+        if (!isFuture)
+          maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log, topicPartition)
       } finally {
-        if (cleaner != null && !isFuture) {
-          cleaner.resumeCleaning(Seq(topicPartition))
-          info(s"Compaction for partition $topicPartition is resumed")
-        }
+        if (!isFuture)
+          resumeCleaning(topicPartition)
       }
-      checkpointRecoveryOffsetsAndCleanSnapshot(log.parentDirFile, Seq(log))
+      checkpointRecoveryOffsetsAndCleanSnapshotsInDir(log.parentDirFile, Seq(log))
     }
   }
 
@@ -585,10 +582,10 @@ class LogManager(logDirs: Seq[File],
    * to avoid recovering the whole log on startup.
    */
   def checkpointLogRecoveryOffsets(): Unit = {
-    logsByDir.foreach { case (dir, partitionToLogMap) =>
-      liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
-        checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
-      }
+    val logsByDirCached = logsByDir
+    liveLogDirs.foreach { logDir =>
+      val logsToCheckpoint = logsInDir(logsByDirCached, logDir)
+      checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, logsToCheckpoint.values.toSeq)
     }
   }
 
@@ -597,53 +594,66 @@ class LogManager(logDirs: Seq[File],
    * to avoid exposing data that have been deleted by DeleteRecordsRequest
    */
   def checkpointLogStartOffsets(): Unit = {
-    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
+    val logsByDirCached = logsByDir
+    liveLogDirs.foreach { logDir =>
+      checkpointLogStartOffsetsInDir(logDir, logsInDir(logsByDirCached, logDir))
+    }
   }
 
   /**
-    * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
-    *
-    * @param dir the directory in which logs are checkpointed
-    * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
-    */
+   * Checkpoint recovery offsets for all the logs in logDir and clean the snapshots of all the
+   * provided logs.
+   *
+   * @param logDir the directory in which the logs to be checkpointed are
+   * @param logsToCleanSnapshot the logs whose snapshots will be cleaned
+   */
   // Only for testing
-  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+  private[log] def checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+    checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsInDir(logDir), logsToCleanSnapshot)
+  }
+
+  /**
+   * Checkpoint recovery offsets for all the provided logs and clean the snapshots of all the
+   * provided logs.
+   *
+   * @param logDir the directory in which the logs are
+   * @param logsToCheckpoint the logs to be checkpointed
+   * @param logsToCleanSnapshot the logs whose snapshots will be cleaned
+   */
+  private def checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, Log],
+                                                              logsToCleanSnapshot: Seq[Log]): Unit = {
     try {
-      checkpointLogRecoveryOffsetsInDir(dir)
+      recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
+        val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
+        checkpoint.write(recoveryOffsets)
+      }
       logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
     } catch {
+      case e: KafkaStorageException =>
+        error(s"Disk error while writing recovery offsets checkpoint in directory $logDir: ${e.getMessage}")
       case e: IOException =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
-          s"file in directory $dir", e)
-    }
-  }
-
-  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- recoveryPointCheckpoints.get(dir)
-    } {
-      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
+        logDirFailureChannel.maybeAddOfflineLogDir(logDir.getAbsolutePath,
+          s"Disk error while writing recovery offsets checkpoint in directory $logDir: ${e.getMessage}", e)
     }
   }
 
   /**
-   * Checkpoint log start offset for all logs in provided directory.
+   * Checkpoint log start offsets for all the provided logs in the provided directory.
+   *
+   * @param logDir the directory in which logs are checkpointed
+   * @param logsToCheckpoint the logs to be checkpointed
    */
-  private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- logStartOffsetCheckpoints.get(dir)
-    } {
-      try {
-        val logStartOffsets = partitionToLog.collect {
-          case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset
+  private def checkpointLogStartOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, Log]): Unit = {
+    try {
+      logStartOffsetCheckpoints.get(logDir).foreach { checkpoint =>
+        val logStartOffsets = logsToCheckpoint.collect {
+          case (tp, log) if log.logStartOffset > log.logSegments.head.baseOffset => tp -> log.logStartOffset
         }
         checkpoint.write(logStartOffsets)
-      } catch {
-        case e: IOException =>
-          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
       }
+    } catch {
+      case e: KafkaStorageException =>
+        error(s"Disk error while writing log start offsets checkpoint in directory $logDir: ${e.getMessage}")
     }
   }
 
@@ -655,11 +665,35 @@ class LogManager(logDirs: Seq[File],
       preferredLogDirs.put(topicPartition, logDir)
   }
 
+  /**
+   * Abort and pause cleaning of the provided partition and log a message about it.
+   */
   def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
-    if (cleaner != null)
+    if (cleaner != null) {
       cleaner.abortAndPauseCleaning(topicPartition)
+      info(s"The cleaning for partition $topicPartition is aborted and paused")
+    }
   }
 
+  /**
+   * Resume cleaning of the provided partition and log a message about it.
+   */
+  private def resumeCleaning(topicPartition: TopicPartition): Unit = {
+    if (cleaner != null) {
+      cleaner.resumeCleaning(Seq(topicPartition))
+      info(s"Cleaning for partition $topicPartition is resumed")
+    }
+  }
+
+  /**
+   * Truncate the cleaner's checkpoint to the based offset of the active segment of
+   * the provided log.
+   */
+  private def maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log: Log, topicPartition: TopicPartition): Unit = {
+    if (cleaner != null) {
+      cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
+    }
+  }
 
   /**
    * Get the log if it exists, otherwise return None
@@ -886,8 +920,7 @@ class LogManager(logDirs: Seq[File],
       currentLogs.put(topicPartition, destLog)
       if (cleaner != null) {
         cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile)
-        cleaner.resumeCleaning(Seq(topicPartition))
-        info(s"Compaction for partition $topicPartition is resumed")
+        resumeCleaning(topicPartition)
       }
 
       try {
@@ -895,8 +928,10 @@ class LogManager(logDirs: Seq[File],
         // Now that replica in source log directory has been successfully renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be deleted.
         sourceLog.close()
-        checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty)
-        checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+        val logDir = sourceLog.parentDirFile
+        val logsToCheckpoint = logsInDir(logDir)
+        checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty)
+        checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
         addLogToBeDeleted(sourceLog)
       } catch {
         case e: KafkaStorageException =>
@@ -917,9 +952,12 @@ class LogManager(logDirs: Seq[File],
     *
     * @param topicPartition TopicPartition that needs to be deleted
     * @param isFuture True iff the future log of the specified partition should be deleted
+    * @param checkpoint True if checkpoints must be written
     * @return the removed log
     */
-  def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
+  def asyncDelete(topicPartition: TopicPartition,
+                  isFuture: Boolean = false,
+                  checkpoint: Boolean = true): Log = {
     val removedLog: Log = logCreationOrDeletionLock synchronized {
       if (isFuture)
         futureLogs.remove(topicPartition)
@@ -927,14 +965,19 @@ class LogManager(logDirs: Seq[File],
         currentLogs.remove(topicPartition)
     }
     if (removedLog != null) {
-      //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
+      // We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
       if (cleaner != null && !isFuture) {
         cleaner.abortCleaning(topicPartition)
-        cleaner.updateCheckpoints(removedLog.parentDirFile)
+        if (checkpoint)
+          cleaner.updateCheckpoints(removedLog.parentDirFile)
       }
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty)
-      checkpointLogStartOffsetsInDir(removedLog.parentDirFile)
+      if (checkpoint) {
+        val logDir = removedLog.parentDirFile
+        val logsToCheckpoint = logsInDir(logDir)
+        checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty)
+        checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
+      }
       addLogToBeDeleted(removedLog)
       info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
     } else if (offlineLogDirs.nonEmpty) {
@@ -944,6 +987,42 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
+   * Rename the directories of the given topic-partitions and add them in the queue for
+   * deletion. Checkpoints are updated once all the directories have been renamed.
+   *
+   * @param topicPartitions The set of topic-partitions to delete asynchronously
+   * @param errorHandler The error handler that will be called when a exception for a particular
+   *                     topic-partition is raised
+   */
+  def asyncDelete(topicPartitions: Set[TopicPartition],
+                  errorHandler: (TopicPartition, Throwable) => Unit): Unit = {
+    val logDirs = mutable.Set.empty[File]
+
+    topicPartitions.foreach { topicPartition =>
+      try {
+        getLog(topicPartition).foreach { log =>
+          logDirs += log.parentDirFile
+          asyncDelete(topicPartition, checkpoint = false)
+        }
+        getLog(topicPartition, isFuture = true).foreach { log =>
+          logDirs += log.parentDirFile
+          asyncDelete(topicPartition, isFuture = true, checkpoint = false)
+        }
+      } catch {
+        case e: Throwable => errorHandler(topicPartition, e)
+      }
+    }
+
+    val logsByDirCached = logsByDir
+    logDirs.foreach { logDir =>
+      if (cleaner != null) cleaner.updateCheckpoints(logDir)
+      val logsToCheckpoint = logsInDir(logsByDirCached, logDir)
+      checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty)
+      checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
+    }
+  }
+
+  /**
    * Provides the full ordered list of suggested directories for the next partition.
    * Currently this is done by calculating the number of partitions in each directory and then sorting the
    * data directories by fewest partitions.
@@ -1022,7 +1101,7 @@ class LogManager(logDirs: Seq[File],
   /**
    * Map of log dir to logs by topic and partitions in that dir
    */
-  def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
     // This code is called often by checkpoint processes and is written in a way that reduces
     // allocations and CPU with many topic partitions.
     // When changing this code please measure the changes with org.apache.kafka.jmh.server.CheckpointBench
@@ -1035,6 +1114,15 @@ class LogManager(logDirs: Seq[File],
     byDir
   }
 
+  private def logsInDir(dir: File): Map[TopicPartition, Log] = {
+    logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+  }
+
+  private def logsInDir(cachedLogsByDir: Map[String, Map[TopicPartition, Log]],
+                        dir: File): Map[TopicPartition, Log] = {
+    cachedLogsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+  }
+
   // logDir should be an absolute path
   def isLogDirOnline(logDir: String): Boolean = {
     // The logDir should be an absolute path
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f171e12..7f50377 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -334,32 +334,6 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.removeMetrics(topic)
   }
 
-  def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Unit  = {
-    if (deletePartition) {
-      getPartition(topicPartition) match {
-        case hostedPartition @ HostedPartition.Online(removedPartition) =>
-          if (allPartitions.remove(topicPartition, hostedPartition)) {
-            maybeRemoveTopicMetrics(topicPartition.topic)
-            // this will delete the local log. This call may throw exception if the log is on offline directory
-            removedPartition.delete()
-          }
-
-        case _ =>
-      }
-
-      // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
-      // This could happen when topic is being deleted while broker is down and recovers.
-      if (logManager.getLog(topicPartition).isDefined)
-        logManager.asyncDelete(topicPartition)
-      if (logManager.getLog(topicPartition, isFuture = true).isDefined)
-        logManager.asyncDelete(topicPartition, isFuture = true)
-    }
-
-    // If we were the leader, we may have some operations still waiting for completion.
-    // We force completion to prevent them from timing out.
-    completeDelayedFetchOrProduceRequests(topicPartition)
-  }
-
   private def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = {
     val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
     delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
@@ -392,7 +366,8 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         this.controllerEpoch = controllerEpoch
 
-        val stoppedPartitions = mutable.Map.empty[TopicPartition, StopReplicaPartitionState]
+        val stoppedPartitions = mutable.Set.empty[TopicPartition]
+        val deletedPartitions = mutable.Set.empty[TopicPartition]
         partitionStates.foreach { case (topicPartition, partitionState) =>
           val deletePartition = partitionState.deletePartition
 
@@ -404,7 +379,7 @@ class ReplicaManager(val config: KafkaConfig,
                 "partition is in an offline log directory")
               responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
 
-            case HostedPartition.Online(partition) =>
+            case hostedPartition @ HostedPartition.Online(partition) =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
               // When a topic is deleted, the leader epoch is not incremented. To circumvent this,
@@ -414,7 +389,24 @@ class ReplicaManager(val config: KafkaConfig,
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
                   requestLeaderEpoch > currentLeaderEpoch) {
-                stoppedPartitions += topicPartition -> partitionState
+                stoppedPartitions += topicPartition
+
+                if (deletePartition) {
+                  if (allPartitions.remove(topicPartition, hostedPartition)) {
+                    maybeRemoveTopicMetrics(topicPartition.topic)
+                    // Logs are not deleted here. They are deleted in a single batch later on.
+                    // This is done to avoid having to checkpoint for every deletions.
+                    partition.delete()
+                    deletedPartitions += topicPartition
+                  }
+                }
+
+                // If we were the leader, we may have some operations still waiting for completion.
+                // We force completion to prevent them from timing out.
+                completeDelayedFetchOrProduceRequests(topicPartition)
+
+                // Assume that everything will go right. It is overwritten in case of an error.
+                responseMap.put(topicPartition, Errors.NONE)
               } else if (requestLeaderEpoch < currentLeaderEpoch) {
                 stateChangeLogger.warn(s"Ignoring StopReplica request (delete=$deletePartition) from " +
                   s"controller $controllerId with correlation id $correlationId " +
@@ -433,29 +425,35 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition
+              if (deletePartition)
+                deletedPartitions += topicPartition
+              responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
         // First stop fetchers for all partitions, then stop the corresponding replicas
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+        replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions)
+        replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions)
 
-        stoppedPartitions.foreach { case (topicPartition, partitionState) =>
-          val deletePartition = partitionState.deletePartition
-          try {
-            stopReplica(topicPartition, deletePartition)
-            responseMap.put(topicPartition, Errors.NONE)
-          } catch {
+        // Delete the logs and checkpoint
+        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => {
+          exception match {
             case e: KafkaStorageException =>
-              stateChangeLogger.error(s"Ignoring StopReplica request (delete=$deletePartition) from " +
+              stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
-                "partition is in an offline log directory", e)
+                "partition is in an offline log directory")
               responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+
+            case e =>
+              stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
+                s"controller $controllerId with correlation id $correlationId " +
+                s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
+                s"${e.getClass.getName} exception: ${e.getMessage}")
+              responseMap.put(topicPartition, Errors.forException(e))
           }
-        }
+        })
 
         (responseMap, Errors.NONE)
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 10faf00..05d7d71 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -448,7 +448,8 @@ class LogManagerTest {
       log.flush()
     }
 
-    logManager.checkpointRecoveryOffsetsAndCleanSnapshot(this.logDir, allLogs.filter(_.dir.getName.contains("test-a")))
+    logManager.checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir,
+      allLogs.filter(_.dir.getName.contains("test-a")))
 
     val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read()
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 09e4f14..4c41abd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -27,8 +27,10 @@ import kafka.api.LeaderAndIsr
 import kafka.api.Request
 import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.cluster.BrokerEndPoint
+import kafka.log.LeaderOffsetIncremented
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils.createBroker
 import kafka.utils.timer.MockTimer
@@ -54,6 +56,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Mockito
 
+import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq}
 
@@ -1301,8 +1304,12 @@ class ReplicaManagerTest {
 
     // We have a fetch in purgatory, now receive a stop replica request and
     // assert that the fetch returns with a NOT_LEADER error
+    replicaManager.stopReplicas(2, 0, 0, brokerEpoch,
+      mutable.Map(tp0 -> new StopReplicaPartitionState()
+        .setPartitionIndex(tp0.partition)
+        .setDeletePartition(true)
+        .setLeaderEpoch(LeaderAndIsr.EpochDuringDelete)))
 
-    replicaManager.stopReplica(tp0, deletePartition = true)
     assertNotNull(fetchResult.get)
     assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
   }
@@ -1336,7 +1343,12 @@ class ReplicaManagerTest {
 
     Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
 
-    replicaManager.stopReplica(tp0, deletePartition = true)
+    replicaManager.stopReplicas(2, 0, 0, brokerEpoch,
+      mutable.Map(tp0 -> new StopReplicaPartitionState()
+        .setPartitionIndex(tp0.partition)
+        .setDeletePartition(true)
+        .setLeaderEpoch(LeaderAndIsr.EpochDuringDelete)))
+
     assertNotNull(produceResult.get)
     assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResult.get.error)
   }
@@ -2058,6 +2070,19 @@ class ReplicaManagerTest {
     val partition = replicaManager.createPartition(tp0)
     partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
 
+    val logDirFailureChannel = new LogDirFailureChannel(replicaManager.config.logDirs.size)
+    val logDir = partition.log.get.parentDirFile
+
+    def readRecoveryPointCheckpoint(): Map[TopicPartition, Long] = {
+      new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile),
+        logDirFailureChannel).read()
+    }
+
+    def readLogStartOffsetCheckpoint(): Map[TopicPartition, Long] = {
+      new OffsetCheckpointFile(new File(logDir, LogManager.LogStartOffsetCheckpointFile),
+        logDirFailureChannel).read()
+    }
+
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
       Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
@@ -2065,6 +2090,17 @@ class ReplicaManagerTest {
 
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
+    val batch = TestUtils.records(records = List(
+      new SimpleRecord(10, "k1".getBytes, "v1".getBytes),
+      new SimpleRecord(11, "k2".getBytes, "v2".getBytes)))
+    partition.appendRecordsToLeader(batch, AppendOrigin.Client, requiredAcks = 0)
+    partition.log.get.updateHighWatermark(2L)
+    partition.log.get.maybeIncrementLogStartOffset(1L, LeaderOffsetIncremented)
+    replicaManager.logManager.checkpointLogRecoveryOffsets()
+    replicaManager.logManager.checkpointLogStartOffsets()
+    assertEquals(Some(1L), readRecoveryPointCheckpoint().get(tp0))
+    assertEquals(Some(1L), readLogStartOffsetCheckpoint().get(tp0))
+
     if (throwIOException) {
       // Delete the underlying directory to trigger an KafkaStorageException
       val dir = partition.log.get.dir
@@ -2084,6 +2120,8 @@ class ReplicaManagerTest {
 
     if (expectedOutput == Errors.NONE && deletePartition) {
       assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
+      assertFalse(readRecoveryPointCheckpoint().contains(tp0))
+      assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
     }
   }
 }