You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/14 20:32:01 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10021: KAFKA-12205: Delete snapshot if leo less than the log start offset

jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r575858200



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +259,46 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFileBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * remove all snapshots whose end offset is less than the giving offset, also delete the corresponding
+   * snapshot files.
+   */
+  private def removeSnapshotFileBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {

Review comment:
       How about `removeSnapshotsBefore` or `removeSnapshotFilesBefore`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -261,7 +258,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
      * @return an Optional snapshot id at the log start offset if nonzero, otherwise returns an empty
      *         Optional
      */
-    Optional<OffsetAndEpoch> oldestSnapshotId();
+    Optional<OffsetAndEpoch> earliestSnapshotId();

Review comment:
       Okay. Earliest and latest are better adjectives. I think we picked these names because there is a precedence of using oldest and latest in the `ProducerStateManager`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -315,7 +324,8 @@ object KafkaMetadataLog {
       }
       .forEach { path =>
         path.ifPresent { snapshotPath =>
-          if (snapshotPath.partial) {
+          if (snapshotPath.partial || snapshotPath.snapshotId.offset < log.logStartOffset) {

Review comment:
       Right. I think we need to do this because the `RaftClient` can fail in `deleteBeforeSnapshot` after updating the log start offset but before deleting the snapshots older than the log start offset. Does this match your thinking?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -261,11 +339,60 @@ final class KafkaMetadataLogTest {
   }
 
   @Test
-  def testCreateReplicatedLogTruncatesFully(): Unit = {
+  def testCleanupOlderSnapshots(): Unit = {
     val topicPartition = new TopicPartition("cluster-metadata", 0)
     val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime, topicPartition)
     val numberOfRecords = 10
     val epoch = 1
+
+    append(log, 1, epoch - 1)
+    val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId1)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, 1, epoch, log.endOffset.offset)
+    val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+    val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)

Review comment:
       Minor but this snapshot has the same epoch as some of the other snapshots and the "last fetched epoch" of the log. Did you mean something like `greaterSnapshotId` or just `latestSnapshotId`?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -267,30 +259,46 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
-    oldestSnapshotId
+  override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.first)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
     snapshotIds.add(snapshotId)
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
-    latestSnapshotId.asScala match {
+    latestSnapshotId().asScala match {
       case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
         startOffset < logStartSnapshotId.offset &&
         logStartSnapshotId.offset <= snapshotId.offset &&
         log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
-
         log.deleteOldSegments()
-        oldestSnapshotId = Optional.of(logStartSnapshotId)
+
+        // Delete snapshot after increasing LogStartOffset
+        removeSnapshotFileBefore(logStartSnapshotId)
 
         true
 
       case _ => false
     }
   }
 
+  /**
+   * remove all snapshots whose end offset is less than the giving offset, also delete the corresponding
+   * snapshot files.

Review comment:
       ```suggestion
      * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
   ```

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -196,6 +227,53 @@ final class KafkaMetadataLogTest {
     assertEquals(greaterEpochSnapshotId.offset, log.highWatermark.offset)
   }
 
+  @Test
+  def testTruncateWillRemoveOlderSnapshot(): Unit = {
+
+    val topicPartition = new TopicPartition("cluster-metadata", 0)
+    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime, topicPartition)
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, 1, epoch - 1)
+    val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId1)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, 1, epoch, log.endOffset.offset)
+    val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+    val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)

Review comment:
       Minor but this snapshot has the same epoch as some of the other snapshots and the "last fetched epoch" of the log. Did you mean something like `greaterSnapshotId` or just `latestSnapshotId`?

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
##########
@@ -75,4 +77,23 @@ public void testInvalidSnapshotFilenames() {
         // partition metadata
         assertEquals(Optional.empty(), Snapshots.parse(root.resolve("partition.metadata")));
     }
+
+    @Test
+    public void testDeleteSnapshot() throws IOException {
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
+            TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
+        );
+
+        Path logDirPath = TestUtils.tempDirectory().toPath();
+        FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty());
+        snapshot.freeze();

Review comment:
       Let's close the snapshot writer. How about?
   
   ```suggestion
           try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
               snapshot.freeze();
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org