You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/01 18:21:38 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662505337



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li>
+   * <li>The offset of the next snapshot is greater than the log start offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {

Review comment:
       This is calling `FileRawSnapshotReader#sizeInBytes` which in turn calls `FileRecords#sizeInBytes` which is just calling the `FileChannel` method you mention. The readSnapshot method doesn't actually read the snapshot, it just returns the reader object (maybe we should rename that for clarity)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org