You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/30 01:31:06 UTC

[GitHub] [kafka] jsancio opened a new pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

jsancio opened a new pull request #10431:
URL: https://github.com/apache/kafka/pull/10431


   Change the ownership model for `FileRawSnapshotReader` so that they are owned by `KafkaMetadataLog`. This includes:
   
   1. Changing `KafkaMetadataLog`'s `snapshotIds` form a `Set[OffsetAndEpoch]` to a `Map[OffsetAndEpoch, Optional[FileRawSnapshotReader]]`. This map contains all of the known snapshots. The value will be `Optional.of` if a snapshot reader has been opened in the past.
   
   2. Change `KafkaMetadataLog::removeSnapshotFilesBefore` so that the snapshot reader is closed if it has been opened in the past.
   
   3. Change the interface `RawSnapshotReader` to not extend `Closeable` since only `KafkaMetadataLog` is responsible for closing snapshots.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631363750



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,29 +16,30 @@
  */
 package kafka.raft
 
-import java.io.{File, IOException}
-import java.nio.file.{Files, NoSuchFileException}
-import java.util.concurrent.ConcurrentSkipListSet
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
 import java.util.{Optional, Properties}
 
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel}
 import kafka.utils.{Logging, Scheduler}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import scala.annotation.nowarn
+import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
   scheduler: Scheduler,
   // This object needs to be thread-safe because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created.
-  snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
+  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],

Review comment:
       Is the above comment still accurate since snapshots is no longer thread safe?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+        } catch (IOException e) {
+            log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e);

Review comment:
       Should we just fail the controller on IOException?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -54,8 +54,12 @@ public Records records() {
     }
 
     @Override
-    public void close() throws IOException {
-        fileRecords.close();
+    public void close() {
+        try {
+            fileRecords.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review comment:
       Should we throw KafkaStorageException?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630369878



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close and delete the given
+   * snapshots.

Review comment:
       Added your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631544872



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+        } catch (IOException e) {
+            log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e);

Review comment:
       @mumrah suggested converting all of the `IOException` to `UncheckedIOException`. Kafka doesn't have a precedence of doing that but maybe we should do that going forward. I filed https://issues.apache.org/jira/browse/KAFKA-12773 but I'll change it here to re-throw instead of logging this message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r627660146



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,125 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys

Review comment:
       We do. This method has a comment saying that it assumes that `synchronized` was used before calling this method. I think this is okay since the method is private. Let me know what you think.
   
   If you look at the two callers of this method, they use different locking strategies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630370013



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#issuecomment-809837652


   All of the changes are in https://github.com/apache/kafka/pull/10431/commits/c7a0a5c4543aab57fe606ebe10a699522177bdab. The rest of the changes are included in https://github.com/apache/kafka/pull/10085


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628559894



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader

Review comment:
       Correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628415285



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -161,19 +162,22 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId().asScala match {
+    val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>

Review comment:
       unrelated from your change but this indentation threw me off here.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
       Can we avoid adding new code that uses a deprecated method?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Files.move(immutablePath, deletedPath, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       Should we use Utils#atomicMoveWithFallback like the code was previously?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader

Review comment:
       Just to clarify my understanding -- this is the main change in this PR right? The FileRawSnapshotReader-s are now owned (opened and closed) by KafkaMetadataLog whereas previously they were opened here, but never closed. 
   
   Now in KafkaMetadataLog#close and KafkaMetadataLog#deleteSnapshotFiles, we are closing the FileRawSnapshotReader instances that we opened.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);

Review comment:
       Unrelated to your change, but the bitwise OR is a little surprising. Maybe we can rewrite this to be more explicit? Glancing through our code, I don't see any other usages of this operator besides actual bitwise operations.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -54,8 +54,12 @@ public Records records() {
     }
 
     @Override
-    public void close() throws IOException {
-        fileRecords.close();
+    public void close() {

Review comment:
       AutoCloseable#close throws Exception, so we can still throw IOException. Is there a reason why we want an unchecked exception here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628546403



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
       The issue is that Kafka needs to compile against both Scala 2.12 and 2.13. In Scala 2.13 a lot of the collection methods where deprecated. The community created [scala-collection-compat](https://github.com/scala/scala-collection-compat) to allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that project. Unfortunately, there is a pretty annoying bug in the latest stable version of `scala-collection-compat` that generates "unused import warning" when used in 2.13. The Kafka project turns those warnings into error and the Scala compiler doesn't allow the use of `nowarn` in imports.
   
   The best solution I can find is to use 2.12 methods that are deprecated and add this nowarn flag.
   
   It looks like this problem has been fixed in the devel version of `scala-collection-compat`. When that becomes a release version, we can remove this and few other `nowarn`.
   
   :dizzy: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628550053



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Files.move(immutablePath, deletedPath, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       I filed an issue regarding `atomicMoveWithFallback` and its used on `FileRecords`. `FileRecords` is used by snapshots and log segments. For now, I revert to `atomicMoveWithFallback` and address this issue in that Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r627099786



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,125 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys

Review comment:
       why not put this write operation in synchronized code?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,125 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {

Review comment:
       Is it necessary to put read operations in synchronized code? 

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,125 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close and delete the given
+   * snapshots.
    */
-  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
-    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator
-    while (expiredSnapshotIdsIter.hasNext) {
-      val snapshotId = expiredSnapshotIdsIter.next()
-      // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
-      // on the file system, so we should first remove snapshotId and then delete snapshot file.
-      expiredSnapshotIdsIter.remove()
-
-      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
-      val destination = Snapshots.deleteRename(path, snapshotId)
-      try {
-        Utils.atomicMoveWithFallback(path, destination, false)
-      } catch {
-        case e: IOException =>
-          error(s"Error renaming snapshot file: $path to $destination", e)
-      }
+  private def removeSnapshots(
+    expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+  ): Unit = {
+    expiredSnapshots.foreach { case (snapshotId, _) =>
+      Snapshots.markForDelete(log.dir.toPath, snapshotId)
+    }
+
+    if (!expiredSnapshots.isEmpty) {

Review comment:
       nit: expiredSnapshots.nonEmpty()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #10431:
URL: https://github.com/apache/kafka/pull/10431


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630369703



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close and delete the given
+   * snapshots.
    */
-  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
-    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator
-    while (expiredSnapshotIdsIter.hasNext) {
-      val snapshotId = expiredSnapshotIdsIter.next()
-      // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
-      // on the file system, so we should first remove snapshotId and then delete snapshot file.
-      expiredSnapshotIdsIter.remove()
-
-      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
-      val destination = Snapshots.deleteRename(path, snapshotId)
-      try {
-        Utils.atomicMoveWithFallback(path, destination, false)
-      } catch {
-        case e: IOException =>
-          error(s"Error renaming snapshot file: $path to $destination", e)
-      }
+  private def removeSnapshots(
+    expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+  ): Unit = {
+    expiredSnapshots.foreach { case (snapshotId, _) =>
+      Snapshots.markForDelete(log.dir.toPath, snapshotId)
+    }
+
+    if (expiredSnapshots.nonEmpty) {
       scheduler.schedule(
-        "delete-snapshot-file",
-        () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
-        fileDeleteDelayMs)
+        "delete-snapshot-files",
+        KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots),

Review comment:
       Okay. Changed it to use a different syntax that should get around this IntelliJ error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631543566



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -54,8 +54,12 @@ public Records records() {
     }
 
     @Override
-    public void close() throws IOException {
-        fileRecords.close();
+    public void close() {
+        try {
+            fileRecords.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review comment:
       I am not sure. I could use some guidance here. I read the documentation for `KafkaStorageException`: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java#L19-L30. It looks like Kafka uses `KafkaStorageException` if the IO is visible to the client.
   
   On the server (broker and controller) this code will be called async by the same scheduler used for deleting log segments. In that case `CoreUtils.swallow` is used which logs a WARN message. I think we should do the same here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#issuecomment-831419289


   @hachikuji @dengziming @mumrah This PR is ready for review. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629541231



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -161,19 +162,24 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
-        (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
+    val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {

Review comment:
       Should we grab the `snapshots` lock for this whole match expression like we do in deleteBeforeSnapshot? Is there possible a race between this block and deleteBeforeSnapshot?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628554771



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -54,8 +54,12 @@ public Records records() {
     }
 
     @Override
-    public void close() throws IOException {
-        fileRecords.close();
+    public void close() {

Review comment:
       Yeah. For consistency in the API. For example, most of the methods in this type perform an IO operation which can throw an `IOException` yet they have already been implemented to wrap them in a uncheck exception. In some places we wrap this on a unchecked exception is some places we don't. For the raft module, we have made an implicit decision to prefer uncheck exceptions for IO errors.
   
   I tried to clean this up in https://github.com/apache/kafka/pull/10085:
   > 5. Removed throws IOException from some methods. Some of types were inconsistently throwing IOException in some cases and throwing RuntimeException(..., new IOException(...)) in others. This PR improves the consistent by wrapping IOException in RuntimeException in a few more places and replacing Closeable with AutoCloseable.
   
   This PR expands on that work/decision.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628558150



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);

Review comment:
       Yeah. Java is tricky :smile:. Java supports two type of logical operations: one is conditional (`||`), the other is not (`|`). For reference: https://www.dummies.com/programming/java/logical-operators-in-java/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628546403



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
       The issue is that Kafka needs to compile against both Scala 2.12 and 2.13. In Scala 2.13 a lot of the collection methods were deprecated. The community created [scala-collection-compat](https://github.com/scala/scala-collection-compat) to allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that project. Unfortunately, there is a pretty annoying bug in the latest stable version of `scala-collection-compat` that generates "unused import warning" when used in 2.13. The Kafka project turns those warnings into error and the Scala compiler doesn't allow the use of `nowarn` in imports.
   
   The best solution I can find is to use 2.12 methods that are deprecated and add this nowarn flag.
   
   It looks like this problem has been fixed in the devel version of `scala-collection-compat`. When that becomes a release version, we can remove this and few other `nowarn`.
   
   :dizzy: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r627931196



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,125 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {

Review comment:
       Thanks for the explanations




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631549244



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+        } catch (IOException e) {
+            log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e);

Review comment:
       By changing it to `UncheckedIOExcpetion` this will unwind the stack for the polling thread. Tomorrow, I'll look into how we handle that case but it may already shutdown the broker and controller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629529239



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
       👍 thanks for the explanation 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628566661



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -161,19 +162,22 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId().asScala match {
+    val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
         (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>

Review comment:
       I fixed the implementation. Let me know if it is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631549424



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,29 +16,30 @@
  */
 package kafka.raft
 
-import java.io.{File, IOException}
-import java.nio.file.{Files, NoSuchFileException}
-import java.util.concurrent.ConcurrentSkipListSet
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
 import java.util.{Optional, Properties}
 
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel}
 import kafka.utils.{Logging, Scheduler}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
 
+import scala.annotation.nowarn
+import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
   scheduler: Scheduler,
   // This object needs to be thread-safe because it is used by the snapshotting thread to notify the
   // polling thread when snapshots are created.
-  snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
+  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],

Review comment:
       No. I updated the comment. I'll push a commit tomorrow after a few other changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629531449



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -54,8 +54,12 @@ public Records records() {
     }
 
     @Override
-    public void close() throws IOException {
-        fileRecords.close();
+    public void close() {

Review comment:
       Ok, sounds good. If we're strictly dealing with IOExceptions, maybe we can use UncheckedIOException?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628546403



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
       The issue is that Kafka needs to compile against both Scala 2.12 and 2.13. In Scala 2.13 a lot of the collection methods were deprecated. The community created [scala-collection-compat](https://github.com/scala/scala-collection-compat) to allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that project. Unfortunately, there is a pretty annoying bug in the latest stable version of `scala-collection-compat` that generates "unused import warning" when used in 2.13. The Kafka project turns those warnings into errors and the Scala compiler doesn't allow the use of `nowarn` in imports.
   
   The best solution I can find is to use 2.12 methods that are deprecated and add this nowarn flag.
   
   It looks like this problem has been fixed in the devel version of `scala-collection-compat`. When that becomes a release version, we can remove this and few other `nowarn`.
   
   :dizzy: 

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
       The issue is that Kafka needs to compile against both Scala 2.12 and 2.13. In Scala 2.13 a lot of the collection methods were deprecated. The community created [scala-collection-compat](https://github.com/scala/scala-collection-compat) to allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that project. Unfortunately, there is a pretty annoying bug in the latest stable version of `scala-collection-compat` that generates "unused import warning" when used in 2.13. The Kafka project turns those warnings into errors and the Scala compiler doesn't allow the use of `nowarn` in imports.
   
   The best solution I can find is to use 2.12 methods that are deprecated and add this nowarn annotation.
   
   It looks like this problem has been fixed in the devel version of `scala-collection-compat`. When that becomes a release version, we can remove this and few other `nowarn`.
   
   :dizzy: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629541231



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -161,19 +162,24 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
-        (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
+    val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {

Review comment:
       Should we grab the `snapshots` lock for this while match expression like we do in deleteBeforeSnapshot? Is there possible a race between this block and deleteBeforeSnapshot?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close and delete the given
+   * snapshots.
    */
-  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
-    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator
-    while (expiredSnapshotIdsIter.hasNext) {
-      val snapshotId = expiredSnapshotIdsIter.next()
-      // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
-      // on the file system, so we should first remove snapshotId and then delete snapshot file.
-      expiredSnapshotIdsIter.remove()
-
-      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
-      val destination = Snapshots.deleteRename(path, snapshotId)
-      try {
-        Utils.atomicMoveWithFallback(path, destination, false)
-      } catch {
-        case e: IOException =>
-          error(s"Error renaming snapshot file: $path to $destination", e)
-      }
+  private def removeSnapshots(
+    expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+  ): Unit = {
+    expiredSnapshots.foreach { case (snapshotId, _) =>
+      Snapshots.markForDelete(log.dir.toPath, snapshotId)
+    }
+
+    if (expiredSnapshots.nonEmpty) {
       scheduler.schedule(
-        "delete-snapshot-file",
-        () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
-        fileDeleteDelayMs)
+        "delete-snapshot-files",
+        KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots),

Review comment:
       nit: IntelliJ complains about this line if the `() => ...` isn't there. It compiles fine, it just doesn't like it for some reason.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close and delete the given
+   * snapshots.

Review comment:
       Should we include a comment about the delay here? Maybe something like:
   
   > Asynchronously, close and delete the given snapshots after some delay

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.

Review comment:
       nit: ready -> already




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630364576



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -54,8 +54,12 @@ public Records records() {
     }
 
     @Override
-    public void close() throws IOException {
-        fileRecords.close();
+    public void close() {

Review comment:
       I created https://issues.apache.org/jira/browse/KAFKA-12773




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r627658137



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,125 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {

Review comment:
       Most modern hardware and Java's memory model require that values used by multiple core/threads is read from RAM and written to RAM. In Java, you can force this by using either using `volatile`, `synchronized` or a lot of the types in the `java.util.concurrent` package. The important observation is that this is needed for both reads and writes. This is a decent summary of the issue: https://medium.com/javarevisited/java-concurrency-java-memory-model-96e3ac36ec6b




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r627672049



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +246,125 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close and delete the given
+   * snapshots.
    */
-  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
-    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator
-    while (expiredSnapshotIdsIter.hasNext) {
-      val snapshotId = expiredSnapshotIdsIter.next()
-      // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
-      // on the file system, so we should first remove snapshotId and then delete snapshot file.
-      expiredSnapshotIdsIter.remove()
-
-      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
-      val destination = Snapshots.deleteRename(path, snapshotId)
-      try {
-        Utils.atomicMoveWithFallback(path, destination, false)
-      } catch {
-        case e: IOException =>
-          error(s"Error renaming snapshot file: $path to $destination", e)
-      }
+  private def removeSnapshots(
+    expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+  ): Unit = {
+    expiredSnapshots.foreach { case (snapshotId, _) =>
+      Snapshots.markForDelete(log.dir.toPath, snapshotId)
+    }
+
+    if (!expiredSnapshots.isEmpty) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628550053



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Files.move(immutablePath, deletedPath, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       I filed an issue regarding `atomicMoveWithFallback` and its used on `FileRecords`. `FileRecords` is used by snapshots and log segments. For now, I'll revert to `atomicMoveWithFallback` and address this issue in that Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r632066530



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws
     }
 
     /**
-     * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
-     * ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
+     * Delete the snapshot from the filesystem.
      */
-    public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
-        Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-        Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+    public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
+            return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
         } catch (IOException e) {
-            log.error("Error deleting snapshot file " + deletingPath, e);
+            log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e);
             return false;
         }
     }
 
+    /**
+     * Mark a snapshot for deletion by renaming with the deleted suffix
+     */
+    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+        Path immutablePath = snapshotPath(logDir, snapshotId);
+        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        try {
+            Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+        } catch (IOException e) {
+            log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e);

Review comment:
       Excuse the delay @junrao but I look into this in more detail today. I changed this code to throw an exception instead. This exception will be unhandled by the `KafkaRaftClient` polling thread in both the broker and controller. This will cause the thread to terminate but I don't think it will cause the JVM process to terminate.
   
   We have the following Jira to revisit our exception handling: https://issues.apache.org/jira/browse/KAFKA-10594. I added a comment there to document the issue you highlighted here. Do you mind if we tackle this problem holistically in that Jira?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630367487



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -161,19 +162,24 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
     val latestEpoch = log.latestEpoch.getOrElse(0)
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
-        (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
+    val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {

Review comment:
       Synchronizing `snapshots` is only needed when accessing that object. In `deleteBeforeSnapshot` it is grabbed because the `match` expression accesses `snapshots` in one of the `case`/branch.
   
   In this method I think it is safe to only grab the log where we currently do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org