You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/01 00:50:53 UTC

[GitHub] [kafka] gardnervickers commented on a change in pull request #10896: KAFKA-12964: Collect and rename snapshot files prior to async deletion.

gardnervickers commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r661899296



##########
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##########
@@ -1535,4 +1534,122 @@ class LogLoaderTest {
     assertTrue(onlySegment.lazyOffsetIndex.file.exists())
     assertTrue(onlySegment.lazyTimeIndex.file.exists())
   }
+
+  @Test
+  def testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig()
+    var log = createLog(logDir, logConfig)
+    // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //                   |---> logStartOffset                                           |---> active segment (empty)
+    //                                                                                  |---> logEndOffset
+    for (i <- 0 until 9) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(10, log.logSegments.size)
+    assertEquals(0, log.logStartOffset)
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 1 until 10) {
+      val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    // Increment the log start offset to 4.
+    // After this step, the segments should be:
+    //                              |---> logStartOffset
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //                                                                 |---> active segment (empty)
+    //                                                                 |---> logEndOffset
+    val newLogStartOffset = 4
+    log.updateHighWatermark(log.logEndOffset)
+    log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(9, log.logEndOffset)
+
+    // Append garbage to a segment at baseOffset 1, which is below the current log start offset 4.
+    // After this step, the segments should be:
+    //
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //           |                  |---> logStartOffset               |---> active segment  (empty)
+    //           |                                                     |---> logEndOffset
+    // corrupt record inserted
+    //
+    val segmentToForceTruncation = log.logSegments.take(2).last
+    assertEquals(1, segmentToForceTruncation.baseOffset)
+    val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file))
+    bw.write("corruptRecord")
+    bw.close()
+    log.close()
+
+    // Reopen the log. This will do the following:
+    // - Truncate the segment above to which we appended garbage and will schedule async deletion of all other
+    //   segments from base offsets 2 to 9.
+    // - The remaining segments at base offsets 0 and 1 will be lower than the current logStartOffset 4.
+    //   This will cause async deletion of both remaining segments. Finally a single, active segment is created
+    //   starting at logStartOffset 4.
+    //
+    // Expected segments after the log is opened again:
+    // [4-]
+    //  |---> active segment (empty)
+    //  |---> logStartOffset
+    //  |---> logEndOffset
+    log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, lastShutdownClean = false)
+    assertEquals(1, log.logSegments.size)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(4, log.activeSegment.baseOffset)
+    assertEquals(4, log.logEndOffset)
+
+    // Append records, roll the segments and check that the producer state snapshots are defined.
+    // The expected segments and producer state snapshots, after the appends are complete and segments are rolled,
+    // is as shown below:
+    // [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //  |      |      |      |      |      |---> active segment (empty)
+    //  |      |      |      |      |      |---> logEndOffset
+    //  |      |      |      |      |      |
+    //  |      |------.------.------.------.-----> producer state snapshot files are DEFINED for each offset in: [5-9]
+    //  |----------------------------------------> logStartOffset
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 5 until 10) {
+      val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    val offsetsWithSnapshotFiles = (1 until 5)

Review comment:
       Yes that sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org