You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/12 20:40:17 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

junrao commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631363750



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,29 +16,30 @@
  */
 package kafka.raft
 
-import java.io.{File, IOException}
-import java.nio.file.{Files, NoSuchFileException}
-import java.util.concurrent.ConcurrentSkipListSet
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
 import java.util.{Optional, Properties}
 
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel}
 import kafka.utils.{Logging, Scheduler}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import scala.annotation.nowarn
+import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
   scheduler: Scheduler,
   // This object needs to be thread-safe because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created.
-  snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
+  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],

Review comment:
       Is the above comment still accurate since snapshots is no longer thread safe?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+        } catch (IOException e) {
+            log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e);

Review comment:
       Should we just fail the controller on IOException?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -54,8 +54,12 @@ public Records records() {
     }
 
     @Override
-    public void close() throws IOException {
-        fileRecords.close();
+    public void close() {
+        try {
+            fileRecords.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review comment:
       Should we throw KafkaStorageException?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org