You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/01 09:29:58 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699714060



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.

Review comment:
       nit: the base timestamp is used to calculate the record timestamps from the deltas

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       Do we need `legacyDeleteHorizonMs` as a parameter? As far as I can tell, there are no cases in the tests which override it. Maybe we could just compute it here instead of in `clean`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,38 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
+   *
+   * @return the latestDeleteHorizon that is found from the FilterResult of the cleaning
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+            discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime

Review comment:
       nit: misaligned

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,23 +676,28 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)
       }
 
       override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
+        var isRecordRetained: Boolean = true

Review comment:
       Why do we need this `var`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
        *   2) The message doesn't has value but it can't be deleted now.
        */
       val latestOffsetForKey = record.offset() >= foundOffset
-      val isRetainedValue = record.hasValue || retainDeletes
+      val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2
+      val shouldRetainDeletes =

Review comment:
       nit: maybe turn this into a `def` since we don't even use the computed value unless `record.hasValue` is false.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       When the broker is initialized, `log.latestDeleteHorizon` will be `NO_TIMESTAMP`. We need at least one run to trigger before we can initialize the value. Is there another condition we can rely on in order to ensure that the cleaning still occurs?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 

Review comment:
       nit: no need for `case`. Usually we write this as
   ```scala
   dirtyLogs.filter { ltc =>
   ...
   }
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -522,13 +523,13 @@ private[log] class Cleaner(val id: Int,
     val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
     // group the segments and clean the groups
-    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
+    info("Cleaning log %s (cleaning prior to %s, discarding legacy tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)))

Review comment:
       Might not be very clear what a "legacy tombstone" means. Would it be fair to call this an upper bound on the deletion horizon? 

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty

Review comment:
       I think we can leave off the comment about the batch being empty since we're not using this for the first timestamp anymore.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -544,17 +545,19 @@ private[log] class Cleaner(val id: Int,
    * @param log The log being cleaned
    * @param segments The group of segments being cleaned
    * @param map The offset map to use for cleaning segments
-   * @param deleteHorizonMs The time to retain delete tombstones
+   * @param currentTime The current time in milliseconds
    * @param stats Collector for cleaning statistics
    * @param transactionMetadata State of ongoing transactions which is carried between the cleaning
    *                            of the grouped segments
+   * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2
    */
   private[log] def cleanSegments(log: UnifiedLog,
                                  segments: Seq[LogSegment],
                                  map: OffsetMap,
-                                 deleteHorizonMs: Long,
+                                 currentTime: Long,
                                  stats: CleanerStats,
-                                 transactionMetadata: CleanedTransactionMetadata): Unit = {
+                                 transactionMetadata: CleanedTransactionMetadata,
+                                 legacyDeleteHorizonMs: Long = -1L): Unit = {

Review comment:
       Can we make this a required parameter? We try to avoid optional parameters because it is easy to miss them.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
-        val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+        val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
         info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
-          s"with deletion horizon $deleteHorizonMs, " +
-          s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
+          s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+          s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")

Review comment:
       This log message becomes confusing after this change. How about something like this?
   ```
   s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
       s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed from " +
       s"the segment last modified time of ${currentSegment.lastModified}"
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()
+        }
+        if (!logsWithTombstonesExpired.isEmpty) {

Review comment:
       nit: use `nonEmpty`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org