You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/08 18:53:22 UTC

[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501942512



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -653,36 +697,44 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   def takeSnapshot(): Unit = {
     // If not a new offset, then it is not worth taking another snapshot
     if (lastMapOffset > lastSnapOffset) {
-      val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset)
+      val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, lastMapOffset))
       info(s"Writing producer snapshot at offset $lastMapOffset")
-      writeSnapshot(snapshotFile, producers)
+      writeSnapshot(snapshotFile.file, producers)
+      snapshots.put(snapshotFile.offset, snapshotFile)
 
       // Update the last snap offset according to the serialized map
       lastSnapOffset = lastMapOffset
     }
   }
 
+  /**
+   * Update the parentDir for this ProducerStateManager and all of the snapshot files which it manages.
+   */
+  def updateParentDir(parentDir: File): Unit ={
+    _logDir = parentDir
+    snapshots.forEach((_, s) => s.updateParentDir(parentDir))
+  }
+
   /**
    * Get the last offset (exclusive) of the latest snapshot file.
    */
-  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file))
+  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(_.offset)
 
   /**
    * Get the last offset (exclusive) of the oldest snapshot file.
    */
-  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file))
+  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(_.offset)
 
   /**
-   * When we remove the head of the log due to retention, we need to remove snapshots older than
-   * the new log start offset.
+   * Remove any unreplicated transactions lower than the provided logStartOffset and bring the lastMapOffset forward
+   * if necessary.
    */
-  def truncateHead(logStartOffset: Long): Unit = {
+  def onLogStartOffsetIncremented(logStartOffset: Long): Unit = {
     removeUnreplicatedTransactions(logStartOffset)
 
     if (lastMapOffset < logStartOffset)
       lastMapOffset = logStartOffset
 
-    deleteSnapshotsBefore(logStartOffset)

Review comment:
       The idea here is to clear un-replicated transactions and optionally advance the `lastMapOffset` and `lastSnapOffset` when the logStartOffset is advanced, but to leave the snapshot files around. The corresponding snapshot files should be removed during the retention pass as we cleanup the associated segment files. 
   
   I was attempting to optimize incrementing the logStartOffset a bit so that we don't need to delete the snapshot files from the request handler thread when handling `DELETE_RECORDS`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org