You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/21 16:13:52 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10896: KAFKA-12964: Collect and rename snapshot files prior to async deletion.

junrao commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r654568852



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2390,16 +2390,23 @@ object Log extends Logging {
                                       producerStateManager: ProducerStateManager,
                                       logPrefix: String): Unit = {
     segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
+    val snapshotsToDelete = if (deleteProducerStateSnapshots)
+      segmentsToDelete.flatMap { segment =>
+        producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset)}
+    else
+      Seq()
 
     def deleteSegments(): Unit = {
       info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
       val parentDir = dir.getParent
       maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
         segmentsToDelete.foreach { segment =>
           segment.deleteIfExists()
-          if (deleteProducerStateSnapshots)
-            producerStateManager.removeAndDeleteSnapshot(segment.baseOffset)
         }
+        snapshotsToDelete.foreach( snapshot => {

Review comment:
       Could we just do the following?
   ```
           snapshotsToDelete.foreach{ snapshot => 
             snapshot.deleteIfExists()
           }
   
   ```
   

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was deleted already.
+      // This can happen due to the way we construct an intermediate producer state manager

Review comment:
       Agree with Kowshik. It's not clear if the comment is still valid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org