You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 20:06:22 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

hachikuji commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r436115831



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -465,12 +465,15 @@ class LogManager(logDirs: Seq[File],
       for ((dir, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
 
+        val logsInDir = localLogsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+
         // update the last flush point
         debug(s"Updating recovery points at $dir")
-        checkpointRecoveryOffsetsAndCleanSnapshot(dir, localLogsByDir.getOrElse(dir.toString, Map()).values.toSeq)
+        checkpointLogsRecoveryOffsetsInDir(logsInDir, dir)

Review comment:
       nit: remove `Logs` from name?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -580,53 +586,105 @@ class LogManager(logDirs: Seq[File],
    * to avoid exposing data that have been deleted by DeleteRecordsRequest
    */
   def checkpointLogStartOffsets(): Unit = {
-    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
+    val logsByDirCached = logsByDir
+    liveLogDirs.foreach { logDir =>
+      checkpointLogsStartOffsetsInDir(
+        logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty), logDir)
+    }
   }
 
   /**
-    * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
-    *
-    * @param dir the directory in which logs are checkpointed
-    * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
-    */
+   * Write the checkpoint files for all the provided directories. This is used to cleanup
+   * checkpoints after having deleted partitions.
+   */
+  def checkpoint(logDirs: Set[File]): Unit = {
+    val logsByDirCached = logsByDir
+    logDirs.foreach { logDir =>
+      val partitionToLog = logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty)
+      if (cleaner != null) {
+        cleaner.updateCheckpoints(logDir)
+      }
+      checkpointLogsRecoveryOffsetsInDir(partitionToLog, logDir)
+      checkpointLogsStartOffsetsInDir(partitionToLog, logDir)
+    }
+  }
+
+  /**
+   * Clean snapshots of the provided logs in the provided directory.
+   *
+   * @param logsToCleanSnapshot the logs whose snapshots will be cleaned
+   * @param dir the directory in which the logs are
+   */
   // Only for testing
-  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+  private[log] def cleanSnapshotsInDir(logsToCleanSnapshot: Seq[Log], dir: File): Unit = {
     try {
-      checkpointLogRecoveryOffsetsInDir(dir)
       logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
     } catch {
       case e: IOException =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
-          s"file in directory $dir", e)
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath,
+          s"Disk error while writing to recovery point file in directory $dir", e)
     }
   }
 
-  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- recoveryPointCheckpoints.get(dir)
-    } {
-      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
+  /**
+   * Checkpoint log recovery offsets for all the logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  // Only for testing
+  private[log] def checkpointRecoveryOffsetsInDir(dir: File): Unit = {
+    val partitionToLog = logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+    checkpointLogsRecoveryOffsetsInDir(partitionToLog, dir)
+  }
+
+  /**
+   * Checkpoint log recovery and start offsets for all logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  private def checkpointRecoveryAndLogStartOffsetsInDir(dir: File): Unit = {
+    val partitionToLog = logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+    checkpointLogsRecoveryOffsetsInDir(partitionToLog, dir)
+    checkpointLogsStartOffsetsInDir(partitionToLog, dir)
+  }
+
+  /**
+   * Checkpoint log recovery offsets for all the provided logs in the provided directory.
+   *
+   * @param logs the logs and logs to be checkpointed
+   * @param dir the directory in which logs are checkpointed
+   */
+  private def checkpointLogsRecoveryOffsetsInDir(logs: Map[TopicPartition, Log],
+                                                 dir: File): Unit = {
+    try {
+      recoveryPointCheckpoints.get(dir).foreach { checkpoint =>
+        val recoveryOffsets = logs.map { case (tp, log) => tp -> log.recoveryPoint }
+        checkpoint.write(recoveryOffsets)
+      }
+    } catch {
+      case e: KafkaStorageException =>
+        error(s"Disk error while writing recovery offsets checkpoint in directory $dir: ${e.getMessage}")
     }
   }
 
   /**
-   * Checkpoint log start offset for all logs in provided directory.
+   * Checkpoint log start offsets for all the provided logs in the provided directory.
+   *
+   * @param logs the partitions and logs to be checkpointed
+   * @param dir the directory in which logs are checkpointed
    */
-  private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- logStartOffsetCheckpoints.get(dir)
-    } {
-      try {
-        val logStartOffsets = partitionToLog.collect {
+  private def checkpointLogsStartOffsetsInDir(logs: Map[TopicPartition, Log],
+                                              dir: File): Unit = {
+    try {
+      logStartOffsetCheckpoints.get(dir).foreach { checkpoint =>
+        val logStartOffsets = logs.collect {
           case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset

Review comment:
       nit: k -> tp?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -580,53 +586,105 @@ class LogManager(logDirs: Seq[File],
    * to avoid exposing data that have been deleted by DeleteRecordsRequest
    */
   def checkpointLogStartOffsets(): Unit = {
-    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
+    val logsByDirCached = logsByDir
+    liveLogDirs.foreach { logDir =>
+      checkpointLogsStartOffsetsInDir(
+        logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty), logDir)
+    }
   }
 
   /**
-    * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
-    *
-    * @param dir the directory in which logs are checkpointed
-    * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
-    */
+   * Write the checkpoint files for all the provided directories. This is used to cleanup
+   * checkpoints after having deleted partitions.
+   */
+  def checkpoint(logDirs: Set[File]): Unit = {
+    val logsByDirCached = logsByDir
+    logDirs.foreach { logDir =>
+      val partitionToLog = logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty)
+      if (cleaner != null) {
+        cleaner.updateCheckpoints(logDir)
+      }
+      checkpointLogsRecoveryOffsetsInDir(partitionToLog, logDir)
+      checkpointLogsStartOffsetsInDir(partitionToLog, logDir)
+    }
+  }
+
+  /**
+   * Clean snapshots of the provided logs in the provided directory.
+   *
+   * @param logsToCleanSnapshot the logs whose snapshots will be cleaned
+   * @param dir the directory in which the logs are
+   */
   // Only for testing
-  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+  private[log] def cleanSnapshotsInDir(logsToCleanSnapshot: Seq[Log], dir: File): Unit = {
     try {
-      checkpointLogRecoveryOffsetsInDir(dir)
       logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
     } catch {
       case e: IOException =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
-          s"file in directory $dir", e)
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath,
+          s"Disk error while writing to recovery point file in directory $dir", e)
     }
   }
 
-  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- recoveryPointCheckpoints.get(dir)
-    } {
-      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
+  /**
+   * Checkpoint log recovery offsets for all the logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  // Only for testing
+  private[log] def checkpointRecoveryOffsetsInDir(dir: File): Unit = {
+    val partitionToLog = logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+    checkpointLogsRecoveryOffsetsInDir(partitionToLog, dir)
+  }
+
+  /**
+   * Checkpoint log recovery and start offsets for all logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  private def checkpointRecoveryAndLogStartOffsetsInDir(dir: File): Unit = {
+    val partitionToLog = logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)

Review comment:
       There are a couple place where we have this code. Maybe it make sense to have an overload which accepts the directory.
   
   By the way, `logsByDir` can be private.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File],
         // Now that replica in source log directory has been successfully renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be deleted.
         sourceLog.close()
-        checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty)
-        checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+        checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile)

Review comment:
       Previously we were cleaning snapshots here. I agree it was probably unnecessary, but there seems little harm in it. I guess the main thing is that logically it makes little sense to couple these operations together even if the events which trigger them are often the same. Is that what you were thinking?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -324,8 +324,17 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.removeMetrics(topic)
   }
 
-  def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Unit  = {
+  def stopReplica(topicPartition: TopicPartition,

Review comment:
       I believe it would be straightforward to update the uses of this method in the test case to use `stopReplicas` instead. Then we could make this private, which would make the side effect less annoying.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File],
     if (removedLog != null) {
       //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
       if (cleaner != null && !isFuture) {
-        cleaner.abortCleaning(topicPartition)
-        cleaner.updateCheckpoints(removedLog.parentDirFile)
+        cleaner.abortCleaning(topicPartition, partitionDeleted = true)
       }
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty)

Review comment:
       With this change, it is up to the caller to ensure offsets are checkpointed correctly after a deletion. It would be better from an encapsulation perspective to keep `LogManager` in charge of that. One idea would be to offer a batched `asyncDelete`. The involvement of `Partition` may make this a bit complex though.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -580,53 +586,105 @@ class LogManager(logDirs: Seq[File],
    * to avoid exposing data that have been deleted by DeleteRecordsRequest
    */
   def checkpointLogStartOffsets(): Unit = {
-    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
+    val logsByDirCached = logsByDir
+    liveLogDirs.foreach { logDir =>
+      checkpointLogsStartOffsetsInDir(
+        logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty), logDir)
+    }
   }
 
   /**
-    * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
-    *
-    * @param dir the directory in which logs are checkpointed
-    * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
-    */
+   * Write the checkpoint files for all the provided directories. This is used to cleanup
+   * checkpoints after having deleted partitions.
+   */
+  def checkpoint(logDirs: Set[File]): Unit = {
+    val logsByDirCached = logsByDir
+    logDirs.foreach { logDir =>
+      val partitionToLog = logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty)
+      if (cleaner != null) {
+        cleaner.updateCheckpoints(logDir)
+      }
+      checkpointLogsRecoveryOffsetsInDir(partitionToLog, logDir)
+      checkpointLogsStartOffsetsInDir(partitionToLog, logDir)
+    }
+  }
+
+  /**
+   * Clean snapshots of the provided logs in the provided directory.
+   *
+   * @param logsToCleanSnapshot the logs whose snapshots will be cleaned
+   * @param dir the directory in which the logs are
+   */
   // Only for testing
-  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+  private[log] def cleanSnapshotsInDir(logsToCleanSnapshot: Seq[Log], dir: File): Unit = {
     try {
-      checkpointLogRecoveryOffsetsInDir(dir)
       logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
     } catch {
       case e: IOException =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
-          s"file in directory $dir", e)
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath,
+          s"Disk error while writing to recovery point file in directory $dir", e)
     }
   }
 
-  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- recoveryPointCheckpoints.get(dir)
-    } {
-      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
+  /**
+   * Checkpoint log recovery offsets for all the logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  // Only for testing
+  private[log] def checkpointRecoveryOffsetsInDir(dir: File): Unit = {
+    val partitionToLog = logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+    checkpointLogsRecoveryOffsetsInDir(partitionToLog, dir)
+  }
+
+  /**
+   * Checkpoint log recovery and start offsets for all logs in the provided directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  private def checkpointRecoveryAndLogStartOffsetsInDir(dir: File): Unit = {

Review comment:
       Given that we split `checkpointRecoveryOffsetsAndCleanSnapshot`, I'm curious why we added this.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -273,7 +272,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
    *  6. If the partition is already paused, a new call to this function
    *     will increase the paused count by one.
    */
-  def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
+  def abortAndPauseCleaning(topicPartition: TopicPartition, partitionDeleted: Boolean = false): Unit = {

Review comment:
       It is a little weird to abort _and pause_ cleaning if the partition is getting deleted. Maybe we should have a separate method like `removePartition` or something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org