You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/02 05:43:21 UTC

[GitHub] [kafka] dengziming opened a new pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

dengziming opened a new pull request #10021:
URL: https://github.com/apache/kafka/pull/10021


   *More detailed description of your change*
   3 times to delete a snapshot:
   1. When a follower completing fetch snapshot and truncate the log to the latest snapshot
   2. When a controller pollCurrentState and delete old segment
   3. When restarting the RaftClient.
   
   *Summary of testing strategy (including rationale)*
   Unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] CaoManhDat commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
CaoManhDat commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r568657609



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -173,13 +164,16 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
         // Truncate the log fully if the latest snapshot is greater than the log end offset
 
         log.truncateFullyAndStartAt(snapshotId.offset)
-        oldestSnapshotId = latestSnapshotId
+        // Delete snapshot after truncating
+        val olderSnapshotIds = snapshotIds.headSet(snapshotId)
+        olderSnapshotIds.forEach(id => Snapshots.deleteSnapshotIfExists(log.dir.toPath, id))
+        snapshotIds.removeAll(olderSnapshotIds)

Review comment:
       my point here is calling `olderSnapshotIds.clear()` instead of calling `removeAll()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] CaoManhDat commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
CaoManhDat commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r583340798



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +250,62 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator()

Review comment:
       why we needs `false` here, it seems the default value of calling `headSet`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r568656400



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -173,13 +164,16 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
         // Truncate the log fully if the latest snapshot is greater than the log end offset
 
         log.truncateFullyAndStartAt(snapshotId.offset)
-        oldestSnapshotId = latestSnapshotId
+        // Delete snapshot after truncating
+        val olderSnapshotIds = snapshotIds.headSet(snapshotId)
+        olderSnapshotIds.forEach(id => Snapshots.deleteSnapshotIfExists(log.dir.toPath, id))
+        snapshotIds.removeAll(olderSnapshotIds)

Review comment:
       In fact, after `snapshotIds.removeAll(olderSnapshotIds)`, the  element in olderSnapshotIds is also removed.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,23 +261,31 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        val expiredSnapshotIds = snapshotIds.headSet(logStartSnapshotId)
+        expiredSnapshotIds.forEach(snapshotId => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))
+        snapshotIds.removeAll(expiredSnapshotIds)

Review comment:
       Done!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r580761072



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +93,13 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        return Files.deleteIfExists(path);

Review comment:
       This is a good catch, I changed the procedure to rename and async delete. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r568656400



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -173,13 +164,16 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
         // Truncate the log fully if the latest snapshot is greater than the log end offset
 
         log.truncateFullyAndStartAt(snapshotId.offset)
-        oldestSnapshotId = latestSnapshotId
+        // Delete snapshot after truncating
+        val olderSnapshotIds = snapshotIds.headSet(snapshotId)
+        olderSnapshotIds.forEach(id => Snapshots.deleteSnapshotIfExists(log.dir.toPath, id))
+        snapshotIds.removeAll(olderSnapshotIds)

Review comment:
       In fact, after `snapshotIds.removeAll(olderSnapshotIds)`, the  element in olderSnapshotIds is also removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582491373



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {

Review comment:
       Probably not since the reference would be through the file descriptor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r568656553



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,23 +261,31 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        val expiredSnapshotIds = snapshotIds.headSet(logStartSnapshotId)
+        expiredSnapshotIds.forEach(snapshotId => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))
+        snapshotIds.removeAll(expiredSnapshotIds)

Review comment:
       Done!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r591084650



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -16,16 +16,21 @@
  */
 package org.apache.kafka.snapshot;
 
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.NumberFormat;
 import java.util.Optional;
-import org.apache.kafka.raft.OffsetAndEpoch;
 
 public final class Snapshots {
-    private static final String SUFFIX =  ".checkpoint";
+    private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
+    private static final String SUFFIX = ".checkpoint";
     private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
+    private static final String DELETE_SUFFIX = ".deleted";

Review comment:
       Sorry I made a mistake when understanding `Log.deleteSegmentFiles`, I will change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590923649



##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
##########
@@ -74,5 +79,44 @@ public void testInvalidSnapshotFilenames() {
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("leader-epoch-checkpoint")));
         // partition metadata
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("partition.metadata")));
+        // deleted file
+        assertEquals(Optional.empty(), Snapshots.parse(root.resolve("00000000000000000000.deleted")));
+    }
+
+    @Test
+    public void testDeleteSnapshot() throws IOException {
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
+        );
+
+        Path logDirPath = TestUtils.tempDirectory().toPath();
+        try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
+            snapshot.freeze();
+
+            Path snapshotPath = Snapshots.snapshotPath(logDirPath, snapshotId);
+            assertTrue(Files.exists(snapshotPath));
+
+            // delete snapshot directly
+            assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId()));
+            assertFalse(Files.exists(snapshotPath));
+            assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, snapshotId)));
+        }
+
+        try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
+            snapshot.freeze();
+
+            Path snapshotPath = Snapshots.snapshotPath(logDirPath, snapshotId);
+            assertTrue(Files.exists(snapshotPath));
+
+            // rename snapshot before deleting
+            Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId));

Review comment:
       I'm not very clear about your meaning here, do you meaning put `rename` and `delete` into one helper method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590758128



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -258,30 +251,62 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator()
+    while (expiredSnapshotIdsIter.hasNext) {
+      val snapshotId = expiredSnapshotIdsIter.next()
+      // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
+      // on the file system, so we should first remove snapshotId and then delete snapshot file.
+      expiredSnapshotIdsIter.remove()
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          error(s"Error renaming snapshot file: $path to $destination", e)
+      }
+      scheduler.schedule(
+        "delete-snapshot-file",
+        () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
+        60 * 1000L)

Review comment:
       Perhaps we may as well do it here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r591079811



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -16,16 +16,21 @@
  */
 package org.apache.kafka.snapshot;
 
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.NumberFormat;
 import java.util.Optional;
-import org.apache.kafka.raft.OffsetAndEpoch;
 
 public final class Snapshots {
-    private static final String SUFFIX =  ".checkpoint";
+    private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
+    private static final String SUFFIX = ".checkpoint";
     private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
+    private static final String DELETE_SUFFIX = ".deleted";

Review comment:
       @dengziming Sorry that I didn't notice this earlier. Why don't we make the `DELETE_SUFIX` similar to the `PARTIAL_SUFFIX`? E.g. `public static final String DELETE_SUFFIX = String.format("%s.deleted", SUFFIX);`
   
   This will give us the follow filename suffixes:
   
   1. Immutable snapshots - `.checkpoint`
   2. Partial snapshots - `.checkpoint.part`
   3. Deleted snapshots - `checkpoint.deleted`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590647005



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -258,30 +251,62 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator()
+    while (expiredSnapshotIdsIter.hasNext) {
+      val snapshotId = expiredSnapshotIdsIter.next()
+      // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
+      // on the file system, so we should first remove snapshotId and then delete snapshot file.
+      expiredSnapshotIdsIter.remove()
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          error(s"Error renaming snapshot file: $path to $destination", e)
+      }
+      scheduler.schedule(
+        "delete-snapshot-file",
+        () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
+        60 * 1000L)

Review comment:
       @dengziming, should we file an issue under the Snapshot "epic" to use the log configuration for this?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -336,19 +362,25 @@ object KafkaMetadataLog {
     log: Log
   ): ConcurrentSkipListSet[OffsetAndEpoch] = {
     val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
-    // Scan the log directory; deleting partial snapshots and remembering immutable snapshots
+    // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
+    // from logStartOffset
     Files
       .walk(log.dir.toPath, 1)
       .map[Optional[SnapshotPath]] { path =>
-        if (path != log.dir.toPath) {
-          Snapshots.parse(path)
-        } else {
+
+        if (path == log.dir.toPath) {
+          Optional.empty()
+        } else if (path.endsWith(Snapshots.DELETE_SUFFIX)) {
+          Files.deleteIfExists(path)

Review comment:
       If you implement my other comment, I think you can move this delete operation to `forEach` below.
   
   It is nice to have `map` operations that are side effect free.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -16,16 +16,22 @@
  */
 package org.apache.kafka.snapshot;
 
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.NumberFormat;
 import java.util.Optional;
-import org.apache.kafka.raft.OffsetAndEpoch;
 
 public final class Snapshots {
-    private static final String SUFFIX =  ".checkpoint";
-    private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
+    private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
+    public static final String SUFFIX = ".checkpoint";
+    public static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
+    public static final String DELETE_SUFFIX = ".deleted";

Review comment:
       If you implement my suggestions to `Snapshot.parse` and `SnapshotPath`, I think we can make all of these fields private.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -336,19 +362,25 @@ object KafkaMetadataLog {
     log: Log
   ): ConcurrentSkipListSet[OffsetAndEpoch] = {
     val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
-    // Scan the log directory; deleting partial snapshots and remembering immutable snapshots
+    // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
+    // from logStartOffset
     Files
       .walk(log.dir.toPath, 1)
       .map[Optional[SnapshotPath]] { path =>
-        if (path != log.dir.toPath) {
-          Snapshots.parse(path)
-        } else {
+
+        if (path == log.dir.toPath) {
+          Optional.empty()
+        } else if (path.endsWith(Snapshots.DELETE_SUFFIX)) {

Review comment:
       Can we move the check `if (path.endsWith(Snapshots.DELETE_SUFFIX))` to `Snapshots.parse`? We can do that by improving the `SnapshotPath` to be able to represent immutable snapshot, partial snapshots and deleting snapshot.
   
   We can do this if we replace the internal `boolean partial` in `SnapshotPath` with an enum that tracks the 3 possible states for a snapshot.
   
   What do you think?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +102,24 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete the snapshot from the filesystem, we will firstly tried to rename snapshot file to
+     * ${file}.deleted, or delete the file directly if its already renamed.
+     */
+    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path path = Snapshots.snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        try {
+            // rename before deleting if target file is not renamed
+            if (Files.exists(path) || Files.notExists(destination)) {
+                Utils.atomicMoveWithFallback(path, destination);
+            }
+            return Files.deleteIfExists(destination);

Review comment:
       Hmm. How about?
   
   ```suggestion
               boolean immutableRemoved = Files.deleteIfExists(path);
               boolean deletingRemoved = Files.deleteIfExists(destination);
               return immutableRemoved || deletingRemoved;
   ```
   
   And maybe rename the variables `path` and `destination` to something like `immutablePath` and `deletingPath` respectively.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -32,13 +32,13 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.internals.BatchBuilder
 import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
 import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}

Review comment:
       This comment applies to all of the tests/changes in this file.
   
   Can we also implement all of the tests that were added to `MockLogTest`? It is good to have tests that validate that this two implementations behave similarly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r591909832



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -35,21 +34,14 @@ import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
+  config: LogConfig,

Review comment:
       Sounds good to me. We can revisit this in https://issues.apache.org/jira/browse/KAFKA-12423.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r591116450



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -32,13 +32,13 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.internals.BatchBuilder
 import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
 import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}

Review comment:
       I add some changes in `MockLog` to delete the elder snapshots, and 2 unit tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r577852672



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +259,45 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIds = snapshotIds.headSet(logStartSnapshotId)
+    expiredSnapshotIds.forEach(snapshotId => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))

Review comment:
       One of the invariant for this data structure has is that if `snaspshotIds` contains a snapshot id the `KafkaRaftClient` and `Listener` can expect that the snapshot exists on the file system. With this implementation this is not strictly the case since we are first deleting the snapshot from the file system and then we are removing it from `snapshotIds`. What if we do the opposite? E.g.
   
   ```scala
   import Breaks.{breakable, break}
   
   breakable {
     val iterator = snapshotIds.iterator()
     while (iterator.hasNext()) {
       val snapshotId = iterator.next()
       if (snapshotId < logStartSnapshotId) {
         iterator.remove()
         Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
       } else {
         break
       }
     }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r591870272



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -35,21 +34,14 @@ import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
+  config: LogConfig,

Review comment:
       I'm slightly more inclined to pass through the configuration value as a parameter in the constructor here rather than getting in the habit of relying on `LogConfig`. We're in a an awkward state at the moment in regard to `LogConfig`. We depend on some of the configurations, but others we ignore. It relates to this JIRA I filed: https://issues.apache.org/jira/browse/KAFKA-12423. My thinking for now is perhaps to try and keep the coupling with `LogConfig` from getting too tight. Maybe we could do something like this in `apply`:
   ```scala
       val metadataLog = new KafkaMetadataLog(
         log,
         recoverSnapshots(log),
         topicPartition,
         maxFetchSizeInBytes,
         defaultLogConfig.fileDeleteDelayMs
       )
   ```
   This is probably borderline overkill, but the point is to try and keep the configurations that we rely on in this layer separable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r575934699



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -315,7 +324,8 @@ object KafkaMetadataLog {
       }
       .forEach { path =>
         path.ifPresent { snapshotPath =>
-          if (snapshotPath.partial) {
+          if (snapshotPath.partial || snapshotPath.snapshotId.offset < log.logStartOffset) {

Review comment:
       Here I am thinking of uncleaned broker shutdown, the case you describe is one of them.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -261,7 +258,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
      * @return an Optional snapshot id at the log start offset if nonzero, otherwise returns an empty
      *         Optional
      */
-    Optional<OffsetAndEpoch> oldestSnapshotId();
+    Optional<OffsetAndEpoch> earliestSnapshotId();

Review comment:
       I didn't notice `ProducerStateManager`, I choose `earliest` because of the consumer offset reset strategy, now I think both are OK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#issuecomment-778566284


   @jsancio @hachikuji , Hello, this pr is ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582479381



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {

Review comment:
       If the file is deleted while Kafka is still reading the file, won't the reading fail since we have already renamed the log file?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r581616652



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return Files.deleteIfExists(destination);
+            } catch (IOException e) {
+                throw new RuntimeException("Error deleting snapshot file " + destination + ":" + e.getMessage());
+            }
+        });

Review comment:
       That's correct, it indirectly uses the scheduler in `Log.scala` when deleting segment for example.
   
   Yes. We can pass the same scheduler that we pass to `Log.scala` to this type when it is constructed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] CaoManhDat commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
CaoManhDat commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r568473592



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -173,13 +164,16 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
         // Truncate the log fully if the latest snapshot is greater than the log end offset
 
         log.truncateFullyAndStartAt(snapshotId.offset)
-        oldestSnapshotId = latestSnapshotId
+        // Delete snapshot after truncating
+        val olderSnapshotIds = snapshotIds.headSet(snapshotId)
+        olderSnapshotIds.forEach(id => Snapshots.deleteSnapshotIfExists(log.dir.toPath, id))
+        snapshotIds.removeAll(olderSnapshotIds)

Review comment:
       Should we call `olderSnapshotIds.clear()` here (since the olderSnapshotIds is backed by the original snapshotIds)

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,23 +261,31 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        val expiredSnapshotIds = snapshotIds.headSet(logStartSnapshotId)
+        expiredSnapshotIds.forEach(snapshotId => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))
+        snapshotIds.removeAll(expiredSnapshotIds)

Review comment:
       same here (should we create a method for this block?)

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -173,13 +164,16 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
         // Truncate the log fully if the latest snapshot is greater than the log end offset
 
         log.truncateFullyAndStartAt(snapshotId.offset)
-        oldestSnapshotId = latestSnapshotId
+        // Delete snapshot after truncating
+        val olderSnapshotIds = snapshotIds.headSet(snapshotId)
+        olderSnapshotIds.forEach(id => Snapshots.deleteSnapshotIfExists(log.dir.toPath, id))
+        snapshotIds.removeAll(olderSnapshotIds)

Review comment:
       my point here is calling `olderSnapshotIds.clear()` instead of calling `removeAll()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r569110975



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -173,13 +164,16 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
         // Truncate the log fully if the latest snapshot is greater than the log end offset
 
         log.truncateFullyAndStartAt(snapshotId.offset)
-        oldestSnapshotId = latestSnapshotId
+        // Delete snapshot after truncating
+        val olderSnapshotIds = snapshotIds.headSet(snapshotId)
+        olderSnapshotIds.forEach(id => Snapshots.deleteSnapshotIfExists(log.dir.toPath, id))
+        snapshotIds.removeAll(olderSnapshotIds)

Review comment:
       Thank you, will fix this!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r578168685



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +259,45 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIds = snapshotIds.headSet(logStartSnapshotId)
+    expiredSnapshotIds.forEach(snapshotId => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))

Review comment:
       Your concern is reasonable, I changed the code and added the comments to the annotation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10021:
URL: https://github.com/apache/kafka/pull/10021


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r591121500



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +102,24 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete the snapshot from the filesystem, we will firstly tried to rename snapshot file to
+     * ${file}.deleted, or delete the file directly if its already renamed.
+     */
+    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path path = Snapshots.snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        try {
+            // rename before deleting if target file is not renamed
+            if (Files.exists(path) || Files.notExists(destination)) {
+                Utils.atomicMoveWithFallback(path, destination);
+            }
+            return Files.deleteIfExists(destination);

Review comment:
       This looks better, I change the code and the annotations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] CaoManhDat commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
CaoManhDat commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r583364223



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +250,62 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator()

Review comment:
       makes sense, ty!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582587511



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -299,7 +290,16 @@ final class KafkaMetadataLog private (
       // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
       // on the file system, so we should first remove snapshotId and then delete snapshot file.
       expiredSnapshotIdsIter.remove()
-      Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage)
+      }
+      scheduler.schedule("delete-snapshot-file", () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))

Review comment:
       Done!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582587332



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -341,7 +342,7 @@ object KafkaMetadataLog {
         }
       }
 
-    val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, maxFetchSizeInBytes)
+    val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, topicPartition, maxFetchSizeInBytes)

Review comment:
       I also find it's awkward to represent snapshot files using `OffsetAndEpoch` but we need to rely on `Snapshots` to remove and delete snapshots, will file a ticket to discuss consolidating file management, the first thought is to add  `MetadataLogSnapshot` class to represent a snapshot file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] CaoManhDat commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
CaoManhDat commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r568482293



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,23 +261,31 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        val expiredSnapshotIds = snapshotIds.headSet(logStartSnapshotId)
+        expiredSnapshotIds.forEach(snapshotId => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))
+        snapshotIds.removeAll(expiredSnapshotIds)

Review comment:
       same here (should we create a method for this block?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#issuecomment-781762837


   ping @hachikuji .
   retest this, please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582477733



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return Files.deleteIfExists(destination);
+            } catch (IOException e) {
+                throw new RuntimeException("Error deleting snapshot file " + destination + ":" + e.getMessage());
+            }
+        });

Review comment:
       Added schedule to `KafkaMetadataLog`, PTAL. Do you think we should add a `MetadataLogSnapshot` class instead of using `OffsetAndEpoch` to represent a snapshot?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r580705481



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +93,13 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        return Files.deleteIfExists(path);

Review comment:
       When we remove other log files in `Log`, we first rename them to `{file}.deleted`. Then we schedule them for background deletion. Do you think we should do similarly for snapshot files?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r591629749



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -35,21 +34,14 @@ import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
+  config: LogConfig,

Review comment:
       The `LogConfig` in `log: Log` should be accessible to this class as `log.config`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590663415



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +102,24 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete the snapshot from the filesystem, we will firstly tried to rename snapshot file to
+     * ${file}.deleted, or delete the file directly if its already renamed.
+     */
+    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path path = Snapshots.snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        try {
+            // rename before deleting if target file is not renamed
+            if (Files.exists(path) || Files.notExists(destination)) {
+                Utils.atomicMoveWithFallback(path, destination);
+            }
+            return Files.deleteIfExists(destination);

Review comment:
       Hmm. How about?
   
   ```suggestion
               return Files.deleteIfExists(path) | Files.deleteIfExists(destination);
   ```
   
   And maybe rename the variables `path` and `destination` to something like `immutablePath` and `deletingPath` respectively.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] CaoManhDat commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
CaoManhDat commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r568473592



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -173,13 +164,16 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
         // Truncate the log fully if the latest snapshot is greater than the log end offset
 
         log.truncateFullyAndStartAt(snapshotId.offset)
-        oldestSnapshotId = latestSnapshotId
+        // Delete snapshot after truncating
+        val olderSnapshotIds = snapshotIds.headSet(snapshotId)
+        olderSnapshotIds.forEach(id => Snapshots.deleteSnapshotIfExists(log.dir.toPath, id))
+        snapshotIds.removeAll(olderSnapshotIds)

Review comment:
       Should we call `olderSnapshotIds.clear()` here (since the olderSnapshotIds is backed by the original snapshotIds)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#issuecomment-791092389


   Rebase onto trunk, ping @hachikuji .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590634783



##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
##########
@@ -74,5 +79,44 @@ public void testInvalidSnapshotFilenames() {
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("leader-epoch-checkpoint")));
         // partition metadata
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("partition.metadata")));
+        // deleted file
+        assertEquals(Optional.empty(), Snapshots.parse(root.resolve("00000000000000000000.deleted")));
+    }
+
+    @Test
+    public void testDeleteSnapshot() throws IOException {
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
+        );
+
+        Path logDirPath = TestUtils.tempDirectory().toPath();
+        try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
+            snapshot.freeze();
+
+            Path snapshotPath = Snapshots.snapshotPath(logDirPath, snapshotId);
+            assertTrue(Files.exists(snapshotPath));
+
+            // delete snapshot directly
+            assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId()));
+            assertFalse(Files.exists(snapshotPath));
+            assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, snapshotId)));
+        }
+
+        try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
+            snapshot.freeze();
+
+            Path snapshotPath = Snapshots.snapshotPath(logDirPath, snapshotId);
+            assertTrue(Files.exists(snapshotPath));
+
+            // rename snapshot before deleting
+            Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId));

Review comment:
       nit: I think this is the only difference between this block and the previous. Can we factor out a helper?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r575858200



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +259,46 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFileBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * remove all snapshots whose end offset is less than the giving offset, also delete the corresponding
+   * snapshot files.
+   */
+  private def removeSnapshotFileBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {

Review comment:
       How about `removeSnapshotsBefore` or `removeSnapshotFilesBefore`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -261,7 +258,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
      * @return an Optional snapshot id at the log start offset if nonzero, otherwise returns an empty
      *         Optional
      */
-    Optional<OffsetAndEpoch> oldestSnapshotId();
+    Optional<OffsetAndEpoch> earliestSnapshotId();

Review comment:
       Okay. Earliest and latest are better adjectives. I think we picked these names because there is a precedence of using oldest and latest in the `ProducerStateManager`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -315,7 +324,8 @@ object KafkaMetadataLog {
       }
       .forEach { path =>
         path.ifPresent { snapshotPath =>
-          if (snapshotPath.partial) {
+          if (snapshotPath.partial || snapshotPath.snapshotId.offset < log.logStartOffset) {

Review comment:
       Right. I think we need to do this because the `RaftClient` can fail in `deleteBeforeSnapshot` after updating the log start offset but before deleting the snapshots older than the log start offset. Does this match your thinking?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -261,11 +339,60 @@ final class KafkaMetadataLogTest {
   }
 
   @Test
-  def testCreateReplicatedLogTruncatesFully(): Unit = {
+  def testCleanupOlderSnapshots(): Unit = {
     val topicPartition = new TopicPartition("cluster-metadata", 0)
     val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime, topicPartition)
     val numberOfRecords = 10
     val epoch = 1
+
+    append(log, 1, epoch - 1)
+    val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId1)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, 1, epoch, log.endOffset.offset)
+    val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+    val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)

Review comment:
       Minor but this snapshot has the same epoch as some of the other snapshots and the "last fetched epoch" of the log. Did you mean something like `greaterSnapshotId` or just `latestSnapshotId`?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +259,46 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFileBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * remove all snapshots whose end offset is less than the giving offset, also delete the corresponding
+   * snapshot files.

Review comment:
       ```suggestion
      * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
   ```

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -196,6 +227,53 @@ final class KafkaMetadataLogTest {
     assertEquals(greaterEpochSnapshotId.offset, log.highWatermark.offset)
   }
 
+  @Test
+  def testTruncateWillRemoveOlderSnapshot(): Unit = {
+
+    val topicPartition = new TopicPartition("cluster-metadata", 0)
+    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime, topicPartition)
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, 1, epoch - 1)
+    val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId1)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, 1, epoch, log.endOffset.offset)
+    val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+    val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)

Review comment:
       Minor but this snapshot has the same epoch as some of the other snapshots and the "last fetched epoch" of the log. Did you mean something like `greaterSnapshotId` or just `latestSnapshotId`?

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
##########
@@ -75,4 +77,23 @@ public void testInvalidSnapshotFilenames() {
         // partition metadata
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("partition.metadata")));
     }
+
+    @Test
+    public void testDeleteSnapshot() throws IOException {
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
+        );
+
+        Path logDirPath = TestUtils.tempDirectory().toPath();
+        FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty());
+        snapshot.freeze();

Review comment:
       Let's close the snapshot writer. How about?
   
   ```suggestion
           try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
               snapshot.freeze();
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r581136883



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {

Review comment:
       This looks like the Jira and commit that introduced a delay when deleting segments: https://issues.apache.org/jira/browse/KAFKA-636. It looks like the delays is there to reduce the probability that the file is deleted while Kafka is still reading the file.
   
   Even though `CompletableFuture::supplyAsync` run async it will attempt to run immediately.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return Files.deleteIfExists(destination);
+            } catch (IOException e) {
+                throw new RuntimeException("Error deleting snapshot file " + destination + ":" + e.getMessage());
+            }
+        });

Review comment:
       This uses the default Executor in Java. I think we should instead use the same async scheduler that is used for the `KafkaMetadataLog`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590923649



##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
##########
@@ -74,5 +79,44 @@ public void testInvalidSnapshotFilenames() {
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("leader-epoch-checkpoint")));
         // partition metadata
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("partition.metadata")));
+        // deleted file
+        assertEquals(Optional.empty(), Snapshots.parse(root.resolve("00000000000000000000.deleted")));
+    }
+
+    @Test
+    public void testDeleteSnapshot() throws IOException {
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
+        );
+
+        Path logDirPath = TestUtils.tempDirectory().toPath();
+        try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
+            snapshot.freeze();
+
+            Path snapshotPath = Snapshots.snapshotPath(logDirPath, snapshotId);
+            assertTrue(Files.exists(snapshotPath));
+
+            // delete snapshot directly
+            assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId()));
+            assertFalse(Files.exists(snapshotPath));
+            assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, snapshotId)));
+        }
+
+        try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
+            snapshot.freeze();
+
+            Path snapshotPath = Snapshots.snapshotPath(logDirPath, snapshotId);
+            assertTrue(Files.exists(snapshotPath));
+
+            // rename snapshot before deleting
+            Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId));

Review comment:
       Firstly I didn't understand and now I got it!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r583343987



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +250,62 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator()

Review comment:
       I add `false` to remind us that this headSet is not inclusive, I think it doesn't matter whether add or not, both are OK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582471506



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -299,7 +302,16 @@ final class KafkaMetadataLog private (
       // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
       // on the file system, so we should first remove snapshotId and then delete snapshot file.
       expiredSnapshotIdsIter.remove()
-      Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage)

Review comment:
       nit: Could this be error level? Can we use `$` substitutions? Also, let's include the full exception instead of just the message.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -341,7 +342,7 @@ object KafkaMetadataLog {
         }
       }
 
-    val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, maxFetchSizeInBytes)
+    val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, topicPartition, maxFetchSizeInBytes)

Review comment:
       Not something we need to solve here, but I think we should put some thought into consolidating file management. Right now it's a little awkward to divide responsibility between `KafkaMetadataLog` and `Log`. For example, I think we are trying to say that `KafkaMetadataLog` is responsible for snapshots. That is mostly true, but we are relying on `Log.loadSegmentFiles` for the deletion of orphaned `.delete` snapshots.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -299,7 +290,16 @@ final class KafkaMetadataLog private (
       // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
       // on the file system, so we should first remove snapshotId and then delete snapshot file.
       expiredSnapshotIdsIter.remove()
-      Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage)
+      }
+      scheduler.schedule("delete-snapshot-file", () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))

Review comment:
       Similar to `Log.deleteSegmentFiles`, we should probably use a delay here. I think it would be ok to either hard-code this to the default of 60s, or use `file.delete.delay.ms` from the default configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590657332



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -32,13 +32,13 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.internals.BatchBuilder
 import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
 import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}

Review comment:
       This comment applies to all of the tests/changes in this file.
   
   For all of the tests that were added to this file can we also add them to `MockLogTest`, if they apply? It is good to have tests that validate that these two implementations behave similarly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r581579008



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return Files.deleteIfExists(destination);
+            } catch (IOException e) {
+                throw new RuntimeException("Error deleting snapshot file " + destination + ":" + e.getMessage());
+            }
+        });

Review comment:
       I found that KafkaMetadataLog doesn't use a scheduler currently, should we add a scheduler to it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org