You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 16:13:30 UTC

[GitHub] [kafka] ijuma opened a new pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

ijuma opened a new pull request #8812:
URL: https://github.com/apache/kafka/pull/8812


   1. Don't advance recovery point in `recoverLog` unless there was a clean
   shutdown.
   2. Ensure the recovery point is not ahead of the log end offset.
   3. Clean and flush leader epoch cache and truncate produce state manager
   if deleting segments due to log end offset being smaller than log start
   offset.
   4. If we are unable to delete clean shutdown file that exists, mark the
   directory as offline (this was the intent, but the code was wrong).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436180973



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +

Review comment:
       I was also surprised, so I agree. :) Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436187201



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Got it. So we may still be able to reopen an unflushed segment. That makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583834117



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       For the case that Tim mentioned, if we defer advancing the recovery point, at step 5, the broker will be forced to do log recovery for all unflushed data. If the data is corrupted on disk, it will be detected during recovery. 
   
   For the other case that Ismael mentioned, it is true that data can be lost in that case, but then this is the case where all replicas have failed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436166743



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file system.")
+          removeAndDeleteSegments(logSegments, asyncDelete = false)
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncate()

Review comment:
       Hmm, not sure about this. After KIP-360, we try to retain producer state as long as possible even when the corresponding entries have been removed from the log. However, we're in a strange state given that some of the later segments were apparently removed. Perhaps it is safer to treat this more like a new replica which is starting from scratch.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {

Review comment:
       nit: this is a big initializer. Are there parts we could move to a method?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Can you help me understand what was wrong with this? 

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset
-    recoveryPoint
+    // Update the recovery point if there was a clean shutdown and did not perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, it's best to only advance
+    // the recovery point in flush(Long).
+    if (hasCleanShutdownFile)
+      logEndOffsetOption.foreach(recoveryPoint = _)
+    else
+      recoveryPoint = Math.min(recoveryPoint, logEndOffset)

Review comment:
       Hmm, `logEndOffset` is defined by `nextOffsetMetadata`, which is initialized after `loadSegments` returns. But `recoverLog` is called within `loadSegments`. So does this check work as expected or am I missing something?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -360,7 +360,7 @@ class LogManager(logDirs: Seq[File],
       for ((cleanShutdownFile, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
         try {
-          cleanShutdownFile.delete()
+          Files.deleteIfExists(cleanShutdownFile.toPath)

Review comment:
       Just checking, but the issue here is that we might mistakenly mark the directory is offline if the clean shutdown file did not exist?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +

Review comment:
       I guess it is because of the semantics of DeleteRecords that we trust the checkpoint over the segment data. Might be worth a comment about that since it is a bit surprising.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436179373



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Jun explained in the JIRA, The concern is that if there is a hard failure during recovery, you could end up with a situation where we persisted this, but we did not flush some of the segments. Does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#issuecomment-786935020


   Tests passed, merged to trunk and 2.8.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583907925



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       We discussed this offline and we decided to stick with the fix in this PR for now and to file a separate JIRA to consider flushing unflushed segments during recovery. That would provide stronger guarantees after a restart.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436180609



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset
-    recoveryPoint
+    // Update the recovery point if there was a clean shutdown and did not perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, it's best to only advance
+    // the recovery point in flush(Long).
+    if (hasCleanShutdownFile)
+      logEndOffsetOption.foreach(recoveryPoint = _)
+    else
+      recoveryPoint = Math.min(recoveryPoint, logEndOffset)

Review comment:
       I meant to use `logEndOffsetOption`, so this is a bug. :) It probably indicates that the variable name is bad (I kept it from before). If I had used the right variable, it would be:
   
   ```scala
   def readNextOffset: Long = {
       val fetchData = read(offsetIndex.lastOffset, log.sizeInBytes)
       if (fetchData == null)
         baseOffset
       else
         fetchData.records.batches.asScala.lastOption
           .map(_.nextOffset)
           .getOrElse(baseOffset)
     }
   ```
   
   The idea is that if we delete a bunch of segments, then the recovery point we passed to the `Log` constructor could be ahead of what remains.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583835401



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       I think it is a gap that there is no minimum replication factor before a write can get exposed. Any writes that end up seeing the `NOT_ENOUGH_REPLICAS_AFTER_APPEND` error code are more vulnerable. These are unacknowledged writes, and the producer is expected to retry, but the consumer can still read them once the ISR shrinks and we would still view it as "data loss" if the broker failed before they could be flushed to disk. With the "strict min isr" proposal, the leader is not allowed to shrink the ISR lower than some replication factor, which helps to plug this hole.
   
   Going back to @purplefox's suggestion, it does seem like a good idea to flush segments beyond the recovery point during recovery. It kind of serves to constrain the initial state of the system which makes it easier to reason about (e.g. you only need to worry about the loss of unflushed data from the previous restart). Some of the flush weaknesses probably still exist though regardless of this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #8812:
URL: https://github.com/apache/kafka/pull/8812


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436180862



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {

Review comment:
       I can try.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] purplefox commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
purplefox commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583496197



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Is it possible that a consumer could see "phantom" messages after recovery, even with this change?
   
   1. Kafka Process dies with log data in page cache but not fsync'd
   2. Recovery process sees the un-fsync'd log data but it looks ok so recovery succeeds, nothing to do.
   3. Consumer fetches this data
   4. OS hard dies, losing page cache
   5. Broker is restarted and consumer tries to repeat fetch from same offset but data has gone.
   
   It seems to me once recovery has run we should be sure that all log segments are persistently stored. I'm not sure if we're currently providing that guarantee. It would be pretty simple just to fsync each segment we recover.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#issuecomment-639603068


   Tests still have to be written.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436178123



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -360,7 +360,7 @@ class LogManager(logDirs: Seq[File],
       for ((cleanShutdownFile, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
         try {
-          cleanShutdownFile.delete()
+          Files.deleteIfExists(cleanShutdownFile.toPath)

Review comment:
       `File.delete` doesn't throw an exception and we don't check the result. So the previous code was very misleading. That's what I was trying to fix. And to avoid introducing an issue if the file did not exist, I am using `deleteIfExists`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r584167690



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Filed https://issues.apache.org/jira/browse/KAFKA-12386.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583835401



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       I think it is a gap that there is no minimum replication factor before a write can get exposed. Any writes that end up seeing the `NOT_ENOUGH_REPLICAS_AFTER_APPEND` error code are more vulnerable. These are unacknowledged writes, and the producer is expected to retry, but the consumer can still read them once the ISR shrinks and we would still view it as "data loss" if the broker failed before they could be flushed to disk. With the "strict min isr" proposal, the leader is not allowed to shrink the ISR lower than some replication factor, which helps to plug this hole.
   
   Going back to @purplefox's suggestion, it does seem like a good idea to flush segments beyond the recovery point during recovery. It kind of serves to constrain the initial state of the system which makes it easier to reason about (e.g. you only need to worry about the loss of unflushed data from the last restart). Some of the flush weaknesses probably still exist though regardless of this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583656545



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       @purplefox To clarify, the case you are talking about is:
   
   1. A catastrophic scenario where a partition is offline (i.e. all replicas are down)
   2. The OS was not shutdown
   3. The OS did not flush the data to disk (this typically happens irrespective of our flushes due to OS configurations)
   4. The replica that was the last member of the ISR comes back up, registers, to ZK and the Controller makes it the
   leader (since it was the last member of the ISR, if it was a different replica, it won't be given leadership without
   unclean leader election)
   5. The hw is beyond the flushed segments
   5. Consumer fetches the data beyond the flushed segments
   6. OS hard dies
   
   This is an interesting edge case, it seems incredibly unlikely, but possible if the hw can be beyond the flushed segments. @junrao & @hachikuji are we missing any detail that protects us against this?
   
   The following has a similar impact:
   1. Leader accepts a write
   2. Write is replicated, hw is incremented, but data is not flushed
   3. All replicas die, but the ISR is not shrunk yet
   4. Leader receives a write, accepts it, replication doesn't happen since replicas are gone
   5. ISR is shrunk, hw is incremented
   6. Producer won't receive a successful ack given min.isr=2, but the consumer reads data that is only in the leader
   7. Leader crashes and the unflushed data is gone (or hard disk dies and all the data in the leader is gone)
   
   Flushing the segments during recovery helps on some scenarios, but not the one I just mentioned (assuming I am not missing anything). @hachikuji had a "strict min isr" proposal where the ISR is never allowed to shrink below `min.isr`. I haven't thought about all the details, but perhaps that covers both issues. Thoughts @hachikuji?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436024607



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file system.")
+          removeAndDeleteSegments(logSegments, asyncDelete = false)
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncate()

Review comment:
       @junrao does this seem right? If we don't have any segments, we should be able to completely truncate the producer state manager, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] purplefox commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
purplefox commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583496197



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Is it possible that a consumer could see "phantom" messages after recovery, even with this change?
   
   1. Kafka Process dies with log data in page cache but not fsync'd
   2. Recovery process sees the un-fsync'd log data but it looks ok so recovery succeeds, nothing to do.
   3. Consumer fetches this data
   4. OS hard dies, losing page cache
   5. Consumer tries to repeat fetch from same offset but data has gone.
   
   It seems to me once recovery has run we should be sure that all log segments are persistently stored. I'm not sure if we're currently providing that guarantee.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] purplefox commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
purplefox commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583496197



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Is it possible that a consumer could see "phantom" messages after recovery, even with this change?
   
   1. Kafka Process dies with log data in page cache but not fsync'd
   2. Recovery process sees the un-fsync'd log data but it looks ok so recovery succeeds, nothing to do.
   3. Consumer fetches this data
   4. OS hard dies, losing page cache
   5. Broker is restarted and consumer tries to repeat fetch from same offset but data has gone.
   
   It seems to me once recovery has run we should be sure that all log segments are persistently stored. I'm not sure if we're currently providing that guarantee.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#issuecomment-639654363


   @rite2nikhil We don't need the flush the segments since we don't increment the recovery point. However, I'm not sure about the `leaderEpochCache`. Offline, @junrao said we didn't have to update it, but I don't fully understand why we do in the case where we delete all the log segments due to the log end offset being smaller than the log start offset.
   
   @junrao can you please clarify?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436178994



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file system.")
+          removeAndDeleteSegments(logSegments, asyncDelete = false)
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncate()

Review comment:
       Yeah, that's what I was thinking. What's the best way to achieve this (treat this more like a new replica which is starting from scratch)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#issuecomment-639905898


   @ijuma : We don't need to flush leaderEpochCache after segment recovery since new leader epochs are added through LeaderEpochFileCache.assign() which does flushing already. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org