You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/08 18:53:51 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r609139321



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -760,6 +768,12 @@ class Log(@volatile private var _dir: File,
       // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
       // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
       // do a replace with an existing segment.
+      //
+      // For case 1 (log cleaning), we may have old segments before or after the swap segment that were cleaned.
+      // Unfortunately, since the baseOffset and the readNextOffset were changed, these segments will not be removed on
+      // recovery if they were not yet given a DeletedFileSuffix. A subsequent cleaning that succeeds will correctly remove these segments.
+      // ie. segments [0, 1000), [1000, 2000), [2000, 3000) cleaned into [1500, 1750).swap without marking old segments with DeletedFileSuffix

Review comment:
       Maybe this would be a more interesting example:
   ```
   // ie. segments [0, 1000), [1000, 2000), [2000, 3000), [3000, 4000) cleaned into [1500, 2500).swap without marking old segments with DeletedFileSuffix
   ```
   The filter logic below would capture both [1000, 2000) and [2000, 3000) as the old segments which will be replaced with [1500, 2500). Do I have that right? So in other words, the recovery logic replaces any overlapping segments.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -760,6 +768,12 @@ class Log(@volatile private var _dir: File,
       // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
       // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
       // do a replace with an existing segment.
+      //
+      // For case 1 (log cleaning), we may have old segments before or after the swap segment that were cleaned.
+      // Unfortunately, since the baseOffset and the readNextOffset were changed, these segments will not be removed on
+      // recovery if they were not yet given a DeletedFileSuffix. A subsequent cleaning that succeeds will correctly remove these segments.
+      // ie. segments [0, 1000), [1000, 2000), [2000, 3000) cleaned into [1500, 1750).swap without marking old segments with DeletedFileSuffix
+      // -> [0. 1000), [1500, 1750), [2000, 3000)

Review comment:
       nit: `[0, 1000)`

##########
File path: core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
##########
@@ -116,6 +116,18 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
     records
   }
 
+  protected def sendDuplicateRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,

Review comment:
       How about renaming this to `sendRecordsWithKey` and include the key as a parameter? Otherwise it is a little strange as a general utility since it is tailored so narrowly for compaction. Also, perhaps we could consider moving this to `TestUtils`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int,
       // if any messages are to be retained, write them out
       val outputBuffer = result.outputBuffer
       if (outputBuffer.position() > 0) {
+        if (destSegment.isEmpty) {
+          // create a new segment with a suffix appended to the name of the log and indexes
+          destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset()))
+          transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)

Review comment:
       Sorry to keep pressing this, but do you have an update here? It would be nice to keep the behavior here on par with what is upstream. Really if you think about the way this is working, we could write to the cleaned segment directly from `filterTo` instead of building a buffer in memory, but we can leave that as a follow-up improvement.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -600,20 +608,31 @@ private[log] class Cleaner(val id: Int,
         currentSegmentOpt = nextSegmentOpt
       }
 
-      cleaned.onBecomeInactiveSegment()
-      // flush new segment to disk before swap
-      cleaned.flush()
+      cleanedSegment match {
+        case Some(cleaned) => {

Review comment:
       nit: braces are unnecessary. Similar for `None` case

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -760,6 +768,12 @@ class Log(@volatile private var _dir: File,
       // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
       // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
       // do a replace with an existing segment.
+      //
+      // For case 1 (log cleaning), we may have old segments before or after the swap segment that were cleaned.

Review comment:
       Can we clarify the comment above that "the swap segment must fall within the range of existing segment(s)"? I find it a bit confusing in the context of the new behavior.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2393,6 +2407,10 @@ class Log(@volatile private var _dir: File,
       }
       // okay we are safe now, remove the swap suffix
       sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+
+      // If not recovered swap file we need to increment logStartOffset here. Otherwise, we do this when loading the log.
+      if (!isRecoveredSwapFile)

Review comment:
       Can you explain why this is necessary? I understand that there is logic to initialize the log start offset after loading segments, but why do we need a special check to prevent updating the log start offset here?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -600,20 +608,31 @@ private[log] class Cleaner(val id: Int,
         currentSegmentOpt = nextSegmentOpt
       }
 
-      cleaned.onBecomeInactiveSegment()
-      // flush new segment to disk before swap
-      cleaned.flush()
+      cleanedSegment match {
+        case Some(cleaned) => {
+          // Result of cleaning included at least one record.
+          cleaned.onBecomeInactiveSegment()
+          // flush new segment to disk before swap
+          cleaned.flush()
 
-      // update the modification date to retain the last modified date of the original files
-      val modified = segments.last.lastModified
-      cleaned.lastModified = modified
+          // update the modification date to retain the last modified date of the original files
+          val modified = segments.last.lastModified
+          cleaned.lastModified = modified
 
-      // swap in new segment
-      info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log")
-      log.replaceSegments(List(cleaned), segments)
+          // swap in new segment
+          info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log")
+          log.replaceSegments(List(cleaned), segments)
+        }
+        case None => {
+          info(s"Deleting segment(s) $segments in log $log")
+          log.deleteSegments(segments, SegmentCompaction)
+        }
+      }
     } catch {
       case e: LogCleaningAbortedException =>
-        try cleaned.deleteIfExists()
+          try if (cleanedSegment.isDefined) {

Review comment:
       nit: this alignment looks off. Also, can we do a `foreach`?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -601,6 +601,9 @@ private[log] object LogCleanerManager extends Logging {
       // the active segment is always uncleanable
       Option(log.activeSegment.baseOffset),
 
+      // we do not want to clean past the high watermark
+      Option(log.highWatermark),

Review comment:
       I agree this probably makes sense. It is surprising that it was not there already. Was there a specific reason related to this patch that you decided to do it here? An alternative by the way is to replace the high watermark and first unstable offset here with `lastStableOffset`, which would represent the lower bound of the two.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -683,6 +702,8 @@ private[log] class Cleaner(val id: Int,
     }
 
     var position = 0
+    var destSegment = dest
+    val topicPartition = log.topicPartition

Review comment:
       nit: does not feel like a very useful redefinition.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2393,6 +2407,10 @@ class Log(@volatile private var _dir: File,
       }
       // okay we are safe now, remove the swap suffix
       sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+
+      // If not recovered swap file we need to increment logStartOffset here. Otherwise, we do this when loading the log.
+      if (!isRecoveredSwapFile)
+         maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, LogCompaction)

Review comment:
       Maybe we need a separate reason for the segment splitting case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org