You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/09 19:46:48 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r590647005



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -258,30 +251,62 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFilesBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   */
+  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
+    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator()
+    while (expiredSnapshotIdsIter.hasNext) {
+      val snapshotId = expiredSnapshotIdsIter.next()
+      // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
+      // on the file system, so we should first remove snapshotId and then delete snapshot file.
+      expiredSnapshotIdsIter.remove()
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          error(s"Error renaming snapshot file: $path to $destination", e)
+      }
+      scheduler.schedule(
+        "delete-snapshot-file",
+        () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
+        60 * 1000L)

Review comment:
       @dengziming, should we file an issue under the Snapshot "epic" to use the log configuration for this?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -336,19 +362,25 @@ object KafkaMetadataLog {
     log: Log
   ): ConcurrentSkipListSet[OffsetAndEpoch] = {
     val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
-    // Scan the log directory; deleting partial snapshots and remembering immutable snapshots
+    // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
+    // from logStartOffset
     Files
       .walk(log.dir.toPath, 1)
       .map[Optional[SnapshotPath]] { path =>
-        if (path != log.dir.toPath) {
-          Snapshots.parse(path)
-        } else {
+
+        if (path == log.dir.toPath) {
+          Optional.empty()
+        } else if (path.endsWith(Snapshots.DELETE_SUFFIX)) {
+          Files.deleteIfExists(path)

Review comment:
       If you implement my other comment, I think you can move this delete operation to `forEach` below.
   
   It is nice to have `map` operations that are side effect free.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -16,16 +16,22 @@
  */
 package org.apache.kafka.snapshot;
 
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.NumberFormat;
 import java.util.Optional;
-import org.apache.kafka.raft.OffsetAndEpoch;
 
 public final class Snapshots {
-    private static final String SUFFIX =  ".checkpoint";
-    private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
+    private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
+    public static final String SUFFIX = ".checkpoint";
+    public static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
+    public static final String DELETE_SUFFIX = ".deleted";

Review comment:
       If you implement my suggestions to `Snapshot.parse` and `SnapshotPath`, I think we can make all of these fields private.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -336,19 +362,25 @@ object KafkaMetadataLog {
     log: Log
   ): ConcurrentSkipListSet[OffsetAndEpoch] = {
     val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
-    // Scan the log directory; deleting partial snapshots and remembering immutable snapshots
+    // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
+    // from logStartOffset
     Files
       .walk(log.dir.toPath, 1)
       .map[Optional[SnapshotPath]] { path =>
-        if (path != log.dir.toPath) {
-          Snapshots.parse(path)
-        } else {
+
+        if (path == log.dir.toPath) {
+          Optional.empty()
+        } else if (path.endsWith(Snapshots.DELETE_SUFFIX)) {

Review comment:
       Can we move the check `if (path.endsWith(Snapshots.DELETE_SUFFIX))` to `Snapshots.parse`? We can do that by improving the `SnapshotPath` to be able to represent immutable snapshot, partial snapshots and deleting snapshot.
   
   We can do this if we replace the internal `boolean partial` in `SnapshotPath` with an enum that tracks the 3 possible states for a snapshot.
   
   What do you think?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +102,24 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete the snapshot from the filesystem, we will firstly tried to rename snapshot file to
+     * ${file}.deleted, or delete the file directly if its already renamed.
+     */
+    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path path = Snapshots.snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        try {
+            // rename before deleting if target file is not renamed
+            if (Files.exists(path) || Files.notExists(destination)) {
+                Utils.atomicMoveWithFallback(path, destination);
+            }
+            return Files.deleteIfExists(destination);

Review comment:
       Hmm. How about?
   
   ```suggestion
               boolean immutableRemoved = Files.deleteIfExists(path);
               boolean deletingRemoved = Files.deleteIfExists(destination);
               return immutableRemoved || deletingRemoved;
   ```
   
   And maybe rename the variables `path` and `destination` to something like `immutablePath` and `deletingPath` respectively.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -32,13 +32,13 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.internals.BatchBuilder
 import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
 import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}

Review comment:
       This comment applies to all of the tests/changes in this file.
   
   Can we also implement all of the tests that were added to `MockLogTest`? It is good to have tests that validate that this two implementations behave similarly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org