You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/23 22:41:48 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657507051



##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -106,7 +174,17 @@ object LogLoader extends Logging {
       loadSegmentFiles(params)
     })
 
-    completeSwapOperations(swapFiles, params)
+    // Do the actual recovery for toRecoverSwapFiles, as discussed above.

Review comment:
       Hmm, I am not sure why we need this step. We have processed all .swap files before and no new .swap files should be introduced if we get to here.

##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -167,21 +245,14 @@ object LogLoader extends Logging {
    * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than
    * the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted
    * by this method.
+   *
    * @param params The parameters for the log being loaded from disk
-   * @return Set of .swap files that are valid to be swapped in as segment files
+   * @return Set of .swap files that are valid to be swapped in as segment files and index files

Review comment:
       The PR descriptions says "as a result, if at least one .swap file exists for a segment, all other files for the segment must exist as .cleaned files or .swap files. Therefore, we rename the .cleaned files to .swap files, then make them normal segment files.". Are we implementing the renaming of .clean files to .swap files?

##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -90,11 +90,79 @@ object LogLoader extends Logging {
    *                                           overflow index offset
    */
   def load(params: LoadLogParams): LoadedLogOffsets = {
-    // first do a pass through the files in the log directory and remove any temporary files
+
+    // First pass: through the files in the log directory and remove any temporary files
     // and find any interrupted swap operations
     val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-    // Now do a second pass and load all the log and index files.
+    // The remaining valid swap files must come from compaction operation. We can simply rename them
+    // to regular segment files. But, before renaming, we should figure out which segments are
+    // compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset.
+    // If sanity check fails, we cannot do the simple renaming, we must do a full recovery, which
+    // involves rebuilding all the index files and the producer state.
+    // We store segments that require renaming and recovery in this code block, and do the actual
+    // renaming and recovery later.
+    var minSwapFileOffset = Long.MaxValue
+    var maxSwapFileOffset = Long.MinValue
+    val toRenameSwapFiles = mutable.Set[File]()
+    val toRecoverSwapFiles = mutable.Set[File]()
+    swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
+      val baseOffset = offsetFromFile(f)
+      val segment = LogSegment.open(f.getParentFile,
+        baseOffset = baseOffset,
+        params.config,
+        time = params.time,
+        fileSuffix = Log.SwapFileSuffix)
+      try {
+        segment.sanityCheck(false)

Review comment:
       It doesn't seem we need this since we call segment.sanityCheck() on all segments later in loadSegmentFiles().

##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -90,11 +90,79 @@ object LogLoader extends Logging {
    *                                           overflow index offset
    */
   def load(params: LoadLogParams): LoadedLogOffsets = {
-    // first do a pass through the files in the log directory and remove any temporary files
+
+    // First pass: through the files in the log directory and remove any temporary files
     // and find any interrupted swap operations
     val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-    // Now do a second pass and load all the log and index files.
+    // The remaining valid swap files must come from compaction operation. We can simply rename them

Review comment:
       It seems that those swap files could be the result of segment split too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org