You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 21:43:19 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436166743



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file system.")
+          removeAndDeleteSegments(logSegments, asyncDelete = false)
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncate()

Review comment:
       Hmm, not sure about this. After KIP-360, we try to retain producer state as long as possible even when the corresponding entries have been removed from the log. However, we're in a strange state given that some of the later segments were apparently removed. Perhaps it is safer to treat this more like a new replica which is starting from scratch.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {

Review comment:
       nit: this is a big initializer. Are there parts we could move to a method?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Can you help me understand what was wrong with this? 

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset
-    recoveryPoint
+    // Update the recovery point if there was a clean shutdown and did not perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, it's best to only advance
+    // the recovery point in flush(Long).
+    if (hasCleanShutdownFile)
+      logEndOffsetOption.foreach(recoveryPoint = _)
+    else
+      recoveryPoint = Math.min(recoveryPoint, logEndOffset)

Review comment:
       Hmm, `logEndOffset` is defined by `nextOffsetMetadata`, which is initialized after `loadSegments` returns. But `recoverLog` is called within `loadSegments`. So does this check work as expected or am I missing something?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -360,7 +360,7 @@ class LogManager(logDirs: Seq[File],
       for ((cleanShutdownFile, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
         try {
-          cleanShutdownFile.delete()
+          Files.deleteIfExists(cleanShutdownFile.toPath)

Review comment:
       Just checking, but the issue here is that we might mistakenly mark the directory is offline if the clean shutdown file did not exist?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +

Review comment:
       I guess it is because of the semantics of DeleteRecords that we trust the checkpoint over the segment data. Might be worth a comment about that since it is a bit surprising.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org