You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/08 08:05:28 UTC

[GitHub] [kafka] dajac commented on a change in pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

dajac commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r436519796



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -580,53 +586,105 @@ class LogManager(logDirs: Seq[File],
    * to avoid exposing data that have been deleted by DeleteRecordsRequest
    */
   def checkpointLogStartOffsets(): Unit = {
-    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
+    val logsByDirCached = logsByDir
+    liveLogDirs.foreach { logDir =>
+      checkpointLogsStartOffsetsInDir(
+        logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty), logDir)
+    }
   }
 
   /**
-    * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
-    *
-    * @param dir the directory in which logs are checkpointed
-    * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
-    */
+   * Write the checkpoint files for all the provided directories. This is used to cleanup
+   * checkpoints after having deleted partitions.
+   */
+  def checkpoint(logDirs: Set[File]): Unit = {
+    val logsByDirCached = logsByDir
+    logDirs.foreach { logDir =>
+      val partitionToLog = logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty)
+      if (cleaner != null) {
+        cleaner.updateCheckpoints(logDir)
+      }
+      checkpointLogsRecoveryOffsetsInDir(partitionToLog, logDir)
+      checkpointLogsStartOffsetsInDir(partitionToLog, logDir)
+    }
+  }
+
+  /**
+   * Clean snapshots of the provided logs in the provided directory.
+   *
+   * @param logsToCleanSnapshot the logs whose snapshots will be cleaned
+   * @param dir the directory in which the logs are
+   */
   // Only for testing
-  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+  private[log] def cleanSnapshotsInDir(logsToCleanSnapshot: Seq[Log], dir: File): Unit = {
     try {
-      checkpointLogRecoveryOffsetsInDir(dir)
       logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
     } catch {
       case e: IOException =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
-          s"file in directory $dir", e)
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath,
+          s"Disk error while writing to recovery point file in directory $dir", e)
     }
   }
 
-  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- recoveryPointCheckpoints.get(dir)
-    } {
-      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
+  /**
+   * Checkpoint log recovery offsets for all the logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  // Only for testing
+  private[log] def checkpointRecoveryOffsetsInDir(dir: File): Unit = {
+    val partitionToLog = logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+    checkpointLogsRecoveryOffsetsInDir(partitionToLog, dir)
+  }
+
+  /**
+   * Checkpoint log recovery and start offsets for all logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  private def checkpointRecoveryAndLogStartOffsetsInDir(dir: File): Unit = {

Review comment:
       It is a small optimization to avoid computing the `logsByDir` mapping twice when both we checkpoint both at the same time. Computing the mapping is quite expensive when there are many logs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org