You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/30 07:28:30 UTC

[GitHub] [kafka] kowshik commented on a change in pull request #11345: Allow empty last segment to have missing offset index during recovery

kowshik commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r719132048



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -77,8 +77,9 @@ class LogSegment private[log] (val log: FileRecords,
     timeIndex.resize(size)
   }
 
-  def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = {
-    if (lazyOffsetIndex.file.exists) {
+  def sanityCheck(timeIndexFileNewlyCreated: Boolean, isActiveSegment: Boolean): Unit = {
+    // We allow for absence of offset index file only for an empty active segment.
+    if ((isActiveSegment && size == 0) || lazyOffsetIndex.file.exists) {

Review comment:
       @junrao: When the `UnifiedLog` is [flushed during clean shutdown](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LogManager.scala#L527), we flush the `LocaLog` [until the logEndOffset](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/UnifiedLog.scala#L1508). Here an empty active segment is not included in the list of candidate segments to be flushed. The reason is that during `LocalLog.flush()`, the `LogSegments.values(recoveryPoint, logEndOffset)` call [here](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LocalLog.scala#L171) does not select the empty active segment ([doc](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LogSegments.scala#L127-L131)), because, the logEndOffset would match the base offset of the empty active segment and thus
  get ommitted. So, prior to clean shutdown if the empty active segment's offset index was never created before, then, the offset index will not be created during clean shutdown because the empty active segment is never flushed.
   
   The above is shown in the following passing unit test:
   
   ```
   @Test
   def testFlushEmptyActiveSegmentDoesNotCreateOffsetIndex(): Unit = {
       // Create an empty log.
       val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
       val log = createLog(logDir, logConfig)
       val oneRecord = TestUtils.records(List(
         new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes)
       ))
   
       // Append a record and flush. Verify that there exists only 1 segment.
       log.appendAsLeader(oneRecord, leaderEpoch = 0)
       assertEquals(1, log.logEndOffset)
       log.flush()
       assertEquals(1, log.logSegments.size)
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertFalse(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   
       // Roll the log and verify that the new active segment's offset index is missing.
       log.roll()
       assertEquals(2, log.logSegments.size)
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertTrue(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   
       // Flush the log and once again verify that the active segment's offset index is still missing.
       log.flush()
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertTrue(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   
       // Close the log and verify that the active segment's offset index is still missing.
       log.close()
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertTrue(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   }
   ```
   
   This PR mainly fixes a logging issue in the code. For example, one situation where the issue happens more frequently is the following: Imagine there exists a topic with very low ingress traffic in some/all partitions. Imagine that for this topic the retention setting causes all existing segments to expire and get removed. In such a case, [we roll the log to create an active segment](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/UnifiedLog.scala#L1352-L1354). This ensures there is at least one segment remaining in the `LocalLog` when the retention loop completes. However we don't create the offset index for the active segment until the first append operation. Now before the first append, if the Kafka cluster is rolled then we will see [this false negative corruption error message during recovery.](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LogLoader.scala#L
 336-L337)
   
   This PR fixes the logging problem by ignoring the absence of offset index for an empty active segment during recovery.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org