You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/23 15:32:14 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r581136883



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {

Review comment:
       This looks like the Jira and commit that introduced a delay when deleting segments: https://issues.apache.org/jira/browse/KAFKA-636. It looks like the delays is there to reduce the probability that the file is deleted while Kafka is still reading the file.
   
   Even though `CompletableFuture::supplyAsync` run async it will attempt to run immediately.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return Files.deleteIfExists(destination);
+            } catch (IOException e) {
+                throw new RuntimeException("Error deleting snapshot file " + destination + ":" + e.getMessage());
+            }
+        });

Review comment:
       This uses the default Executor in Java. I think we should instead use the same async scheduler that is used for the `KafkaMetadataLog`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org