You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/08 16:50:03 UTC

[GitHub] [kafka] junrao commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

junrao commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501325705



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   // completed transactions whose markers are at offsets above the high watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = {
+    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {

Review comment:
       ProducerStateManager.listSnapshotFiles() could just be listSnapshotFiles() ?

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   // completed transactions whose markers are at offsets above the high watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = {
+    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+      tm.put(f.offset, f)
+    }
+    tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an associated segment file, but not to remove

Review comment:
       Incomplete sentence after "but not to remove".

##########
File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
##########
@@ -834,6 +834,40 @@ class ProducerStateManagerTest {
     assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
+  @Test
+  def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = {
+    // Test that when stray snapshots are removed, the largest stray snapshot is kept around. This covers the case where
+    // the broker shutdown cleanly and emitted a snapshot file larger than the base offset of the active segment.
+
+    // Create 3 snapshot files at different offsets.
+    Log.producerSnapshotFile(logDir, 42).createNewFile()
+    Log.producerSnapshotFile(logDir, 5).createNewFile()
+    Log.producerSnapshotFile(logDir, 2).createNewFile()
+
+    // claim that we only have one segment with a base offset of 5
+    stateManager.removeStraySnapshots(Set(5))
+
+    // The snapshot file at offset 2 should be considered a stray, but the snapshot at 42 should be kept
+    // around because it is the largest snapshot.
+    assertEquals(Some(42), stateManager.latestSnapshotOffset)
+    assertEquals(Some(5), stateManager.oldestSnapshotOffset)
+    assertEquals(Seq(5, 42), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testRemoveAllStraySnapshots(): Unit = {
+    // Test that when stray snapshots are removed, all stray snapshots are removed when the base offset of the largest
+    // segment exceeds the offset of the largest stray snapshot.

Review comment:
       Below, the base offset of the largest segment equals to and doesn't exceed the offset of the largest stray snapshot.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   // completed transactions whose markers are at offsets above the high watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = {
+    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+      tm.put(f.offset, f)
+    }
+    tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an associated segment file, but not to remove
+   */
+  private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = {
+    var latestStraySnapshot: Option[SnapshotFile] = None
+    val ss = loadSnapshots()
+    for (snapshot <- ss.values().asScala) {
+      val key = snapshot.offset
+      latestStraySnapshot match {
+        case Some(prev) =>
+          if (!segmentBaseOffsets.contains(key)) {
+            // this snapshot is now the largest stray snapshot.
+            prev.deleteIfExists()
+            ss.remove(prev.offset)
+            latestStraySnapshot = Some(snapshot)
+          }
+        case None =>
+          if (!segmentBaseOffsets.contains(key)) {
+            latestStraySnapshot = Some(snapshot)

Review comment:
       Could we also make sure that the offset for latestStraySnapshot is > the largest offset in segmentBaseOffsets?
   

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -1226,6 +1225,104 @@ class LogTest {
     assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
+  @Test
+  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+    // Sleep to breach the retention period
+    mockTime.sleep(1000 * 60 + 1)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Increment the log start offset to exclude the first two segments.
+    log.maybeIncrementLogStartOffset(log.logEndOffset - 1, ClientRecordDeletion)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testCompactionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+    val cleaner = new Cleaner(id = 0,
+      offsetMap = new FakeOffsetMap(Int.MaxValue),
+      ioBufferSize = 64 * 1024,
+      maxIoBufferSize = 64 * 1024,
+      dupBufferLoadFactor = 0.75,
+      throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime),
+      time = mockTime,
+      checkDone = _ => {})
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "a".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "b".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "c".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals("expected a snapshot file per segment base offset, except the first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Clean segments, this should delete everything except the active segment since there only
+    // exists the key "a".
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset))
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion tasks
+    mockTime.sleep(1)
+    assertEquals("expected a snapshot file per segment base offset, excluding the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testLoadingLogCleansOrphanedProducerStateSnapshots(): Unit = {
+    val orphanedSnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath
+    Files.createFile(orphanedSnapshotFile)
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0)
+    createLog(logDir, logConfig)
+    assertEquals("expected orphaned producer state snapshot file to be cleaned up", 0, ProducerStateManager.listSnapshotFiles(logDir).size)

Review comment:
       Hmm, why is orphanedSnapshotFile deleted since we keep the last stray snapshot?

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -1661,14 +1753,17 @@ class LogTest {
     log.roll()
 
     assertEquals(2, log.activeProducersWithLastSequence.size)
-    assertEquals(2, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(log.dir).size)
 
     log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion)
+    log.deleteOldSegments() // force retention to kick in so that the snapshot files are cleaned up.
+    mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so file deletion takes place
+    mockTime.scheduler.tick()

Review comment:
       mockTime.sleep() calls scheduler.tick() already.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -653,36 +697,44 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   def takeSnapshot(): Unit = {
     // If not a new offset, then it is not worth taking another snapshot
     if (lastMapOffset > lastSnapOffset) {
-      val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset)
+      val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, lastMapOffset))
       info(s"Writing producer snapshot at offset $lastMapOffset")
-      writeSnapshot(snapshotFile, producers)
+      writeSnapshot(snapshotFile.file, producers)
+      snapshots.put(snapshotFile.offset, snapshotFile)
 
       // Update the last snap offset according to the serialized map
       lastSnapOffset = lastMapOffset
     }
   }
 
+  /**
+   * Update the parentDir for this ProducerStateManager and all of the snapshot files which it manages.
+   */
+  def updateParentDir(parentDir: File): Unit ={
+    _logDir = parentDir
+    snapshots.forEach((_, s) => s.updateParentDir(parentDir))
+  }
+
   /**
    * Get the last offset (exclusive) of the latest snapshot file.
    */
-  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file))
+  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(_.offset)
 
   /**
    * Get the last offset (exclusive) of the oldest snapshot file.
    */
-  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file))
+  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(_.offset)
 
   /**
-   * When we remove the head of the log due to retention, we need to remove snapshots older than
-   * the new log start offset.
+   * Remove any unreplicated transactions lower than the provided logStartOffset and bring the lastMapOffset forward
+   * if necessary.
    */
-  def truncateHead(logStartOffset: Long): Unit = {
+  def onLogStartOffsetIncremented(logStartOffset: Long): Unit = {
     removeUnreplicatedTransactions(logStartOffset)
 
     if (lastMapOffset < logStartOffset)
       lastMapOffset = logStartOffset
 
-    deleteSnapshotsBefore(logStartOffset)

Review comment:
       Hmm, why don't we need to delete snapshots before logStartOffset?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org