You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/02/26 22:44:58 UTC

[kafka] branch 2.8 updated: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (#8812)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 5002715  KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (#8812)
5002715 is described below

commit 5002715485482a8bffd04c05110a29ca98ab097c
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Fri Feb 26 14:40:46 2021 -0800

    KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (#8812)
    
    1. Don't advance recovery point in `recoverLog` unless there was a clean
    shutdown.
    2. Ensure the recovery point is not ahead of the log end offset.
    3. Clean and flush leader epoch cache and truncate produce state manager
    if deleting segments due to log end offset being smaller than log start
    offset.
    4. If we are unable to delete clean shutdown file that exists, mark the
    directory as offline (this was the intent, but the code was wrong).
    
    Updated one test that was failing after this change to verify the new behavior.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala          | 46 +++++++++++++++++-------
 core/src/main/scala/kafka/log/LogManager.scala   |  2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala |  6 +++-
 3 files changed, 39 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 2249b5e..2729154 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -849,9 +849,25 @@ class Log(@volatile private var _dir: File,
    * @throws LogSegmentOffsetOverflowException if we encountered a legacy segment with offset overflow
    */
   private[log] def recoverLog(): Long = {
+    /** return the log end offset if valid */
+    def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file system.")
+          removeAndDeleteSegments(logSegments, asyncDelete = true, LogRecovery)
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncateFullyAndStartAt(logStartOffset)
+          None
+        }
+      } else None
+    }
+
     // if we have the clean shutdown marker, skip recovery
     if (!hadCleanShutdown) {
-      // okay we need to actually recover this log
       val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
       var truncated = false
 
@@ -879,16 +895,7 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments,
-          asyncDelete = true,
-          reason = LogRecovery)
-      }
-    }
+    val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd()
 
     if (logSegments.isEmpty) {
       // no existing segments, create a new mutable segment beginning at logStartOffset
@@ -900,8 +907,21 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset
-    recoveryPoint
+    // Update the recovery point if there was a clean shutdown and did not perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, it's best to only advance
+    // the recovery point in flush(Long). If we advanced the recovery point here, we could skip recovery for
+    // unflushed segments if the broker crashed after we checkpoint the recovery point and before we flush the
+    // segment.
+    (hadCleanShutdown, logEndOffsetOption) match {
+      case (true, Some(logEndOffset)) =>
+        recoveryPoint = logEndOffset
+        logEndOffset
+      case _ =>
+        val logEndOffset = logEndOffsetOption.getOrElse(activeSegment.readNextOffset)
+        recoveryPoint = Math.min(recoveryPoint, logEndOffset)
+        logEndOffset
+    }
   }
 
   // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index acb9d34..1ca4d7e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -318,7 +318,7 @@ class LogManager(logDirs: Seq[File],
           info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
           // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
           // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
-          cleanShutdownFile.delete()
+          Files.deleteIfExists(cleanShutdownFile.toPath)
           hadCleanShutdown = true
         } else {
           // log recovery itself is being performed by `Log` class during initialization
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1a953c5..66ee5d2 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2518,7 +2518,11 @@ class LogTest {
     log.close()
 
     // test recovery case
-    log = createLog(logDir, logConfig, lastShutdownClean = false)
+    val recoveryPoint = 10
+    log = createLog(logDir, logConfig, recoveryPoint = recoveryPoint, lastShutdownClean = false)
+    // the recovery point should not be updated after unclean shutdown until the log is flushed
+    verifyRecoveredLog(log, recoveryPoint)
+    log.flush()
     verifyRecoveredLog(log, lastOffset)
     log.close()
   }