You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/22 01:21:45 UTC

[GitHub] [kafka] mattwong949 opened a new pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

mattwong949 opened a new pull request #10914:
URL: https://github.com/apache/kafka/pull/10914


   This is rebased PR for #7884 and #9915.
   
   We aim to remove tombstones that persist indefinitely due to low throughput in this issue.
   
   co author: @ConcurrencyPractitioner 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r708747148



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Unit = {
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch)
+          discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime
+        else
+          discardBatchRecords = canDiscardBatch

Review comment:
       This is an existing issue. The following comment on line 1136 seems out of place since the code does that check is inside isBatchLastRecordOfProducer() below.
   
   ```
               // We may retain a record from an aborted transaction if it is the last entry
               // written by a given producerId.
   
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -493,19 +496,19 @@ private[log] class Cleaner(val id: Int,
    * @return The first offset not cleaned and the statistics for this round of cleaning
    */
   private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+    doClean(cleanable, time.milliseconds())
+  }
+
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = {
+    info("Beginning cleaning of log %s".format(cleanable.log.name))
+
     // figure out the timestamp below which it is safe to remove delete tombstones
     // this position is defined to be a configurable time beneath the last modified time of the last clean segment
-    val deleteHorizonMs =
+    val legacyDeleteHorizonMs =

Review comment:
       Perhaps mention in the comment above that this is only used for the old message format?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -1060,7 +1082,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
 
 /**
   * Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position,
-  * and whether it needs compaction immediately.
+  * the reason why it is being cleaned, and whether it needs compaction immediately.

Review comment:
       We no longer pass in the reason.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -163,17 +163,18 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * Choose the log to clean next and add it to the in-progress set. We recompute this
     * each time from the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
+    * Returns a tuple of an Option of the log selected to be cleaned and the reason it was selected.

Review comment:
       We no longer return the reason.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,20 +673,22 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)

Review comment:
       It seems that containsMarkerForEmptyTxn should only be set to canDiscardBatch if this batch is a control batch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682164477



##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+            builder.append(10L, "1".getBytes(), null);
+
+            ByteBuffer filtered = ByteBuffer.allocate(2048);
+            final long deleteHorizon = Integer.MAX_VALUE / 2;
+            final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {

Review comment:
       I've added a test to `MemoryRecordsBuilderTest` that is similar to this one in `MemoryRecordsTest`, but sets the `deleteHorizon` directly through the constructor. I see having both tests as useful




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -246,6 +265,19 @@ public boolean isTransactional() {
         return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
     }
 
+    @Override
+    public boolean hasDeleteHorizonMs() {
+        return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+    }
+
+    @Override
+    public long deleteHorizonMs() {

Review comment:
       I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs` to see how it looks. I think that the current usage in the cases for which this value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, so I am thinking it could be cleaner to keep it as a `long` and contain the logic for returning the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709503140



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Unit = {
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch)
+          discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime
+        else
+          discardBatchRecords = canDiscardBatch

Review comment:
       makes sense. I've removed that comment on 1136 since the case is mentioned in `isBatchLastRecordOfProducer`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668304063



##########
File path: checkstyle/suppressions.xml
##########
@@ -57,7 +57,7 @@
     <suppress checks="ParameterNumber"
               files="DefaultRecordBatch.java"/>
     <suppress checks="ParameterNumber"
-              files="Sender.java"/>

Review comment:
       the same suppression is on L54




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vincent81jiang commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
vincent81jiang commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679346836



##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -239,9 +249,68 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
         return filterResult;
     }
 
+    private static BatchFilterResult filterBatch(RecordBatch batch,
+                                                 BufferSupplier decompressionBufferSupplier,
+                                                 FilterResult filterResult,
+                                                 RecordFilter filter,
+                                                 byte batchMagic,
+                                                 boolean writeOriginalBatch,
+                                                 long maxOffset,

Review comment:
       Looks like maxOffset is always initialized to -1?  If so, we can remove it from input parameters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r704755926



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       We could store some additional stats related to tombstone in the logcleaner checkpoint file. It seems that to support downgrade, we can't change the version number since the existing code expects the version in the file to match that in the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -246,6 +265,19 @@ public boolean isTransactional() {
         return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
     }
 
+    @Override
+    public boolean hasDeleteHorizonMs() {
+        return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+    }
+
+    @Override
+    public long deleteHorizonMs() {

Review comment:
       I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs`, trying to keep the overall logic the same




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r701240936



##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -37,6 +40,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;

Review comment:
       Sorry, I meant adding a description regarding tombstone in the comment of LogCleaner.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       Related to this, I am a bit concerned about the extra cleaning due to this. If we have just one tombstone record, this can force a round of cleaning on idle partitions. An alternative way is to clean the number of total surviving records and tombstone records during cleaning. We only trigger a cleaning if #tombstone/#totalRecords > minCleanableRatio. @hachikuji What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r705721022



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -163,17 +168,18 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * Choose the log to clean next and add it to the in-progress set. We recompute this
     * each time from the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
+    * Returns a tuple of an Option of the log selected to be cleaned and the reason it was selected.
     */
-  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): (Option[LogToClean], LogCleaningReason) = {

Review comment:
       I've moved this into the LogToClean class as a var. We now set this var in `grabFilthiestCompactedLog` and I've expanded on some of the current testing in `LogCleanerManagerTest` to check that the reason is correct when using `grabFilthiestCompactedLog`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -246,6 +265,19 @@ public boolean isTransactional() {
         return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
     }
 
+    @Override
+    public boolean hasDeleteHorizonMs() {
+        return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+    }
+
+    @Override
+    public long deleteHorizonMs() {

Review comment:
       I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs` to see how it looks. I think that the current usage in the cases for which this value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, so I am thinking it could be cleaner to keep it as a `long` and contain the logic for the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r666420857



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -167,21 +167,9 @@ public void ensureValid() {
      *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
      */
     public long baseTimestamp() {
-        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
-    }
-
-    /**
-     * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     * 
-     * @return The first timestamp if a record has been appended, unless the delete horizon has been set
-     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set
-     */
-    public long firstTimestamp() {
-        final long baseTimestamp = baseTimestamp();
         if (hasDeleteHorizonMs())
             return RecordBatch.NO_TIMESTAMP;

Review comment:
       Don't we want to remove this? We still need to be able to access the base timestamp in order to compute the successive timestamps in the record iterator, right?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -167,21 +167,9 @@ public void ensureValid() {
      *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
      */
     public long baseTimestamp() {
-        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
-    }
-
-    /**
-     * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     * 
-     * @return The first timestamp if a record has been appended, unless the delete horizon has been set
-     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set
-     */
-    public long firstTimestamp() {
-        final long baseTimestamp = baseTimestamp();
         if (hasDeleteHorizonMs())
             return RecordBatch.NO_TIMESTAMP;
-        return baseTimestamp;
+        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);

Review comment:
       Can we rename `FIRST_TIMESTAMP_OFFSET` to `BASE_TIMESTAMP_OFFSET`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r701454478



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       @junrao Yeah, that's an interesting idea. Do you think it would be possible to make it a size-based comparison?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r700664177



##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -37,6 +40,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;

Review comment:
       sorry I'm unsure what you mean. Do you think we should have the description of the new removal logic in this test class? Also, is the comment of the class like a comment-block above the class declaration?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r702503726



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -163,17 +168,18 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * Choose the log to clean next and add it to the in-progress set. We recompute this
     * each time from the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
+    * Returns a tuple of an Option of the log selected to be cleaned and the reason it was selected.
     */
-  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): (Option[LogToClean], LogCleaningReason) = {

Review comment:
       Could LogCleaningReason be included as a field in LogToClean?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r702018820



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       Yes, ideally, we want to do size based estimate. I just not sure how accurate we can estimate size given batching and compression.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#issuecomment-865452418


   @junrao @hachikuji Could you help take a review pass? I know Jun has reviewed before, but since we've rebased several times I think it would be helpful to look over again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699714060



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.

Review comment:
       nit: the base timestamp is used to calculate the record timestamps from the deltas

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       Do we need `legacyDeleteHorizonMs` as a parameter? As far as I can tell, there are no cases in the tests which override it. Maybe we could just compute it here instead of in `clean`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,38 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
+   *
+   * @return the latestDeleteHorizon that is found from the FilterResult of the cleaning
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+            discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime

Review comment:
       nit: misaligned

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,23 +676,28 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)
       }
 
       override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
+        var isRecordRetained: Boolean = true

Review comment:
       Why do we need this `var`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
        *   2) The message doesn't has value but it can't be deleted now.
        */
       val latestOffsetForKey = record.offset() >= foundOffset
-      val isRetainedValue = record.hasValue || retainDeletes
+      val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2
+      val shouldRetainDeletes =

Review comment:
       nit: maybe turn this into a `def` since we don't even use the computed value unless `record.hasValue` is false.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       When the broker is initialized, `log.latestDeleteHorizon` will be `NO_TIMESTAMP`. We need at least one run to trigger before we can initialize the value. Is there another condition we can rely on in order to ensure that the cleaning still occurs?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 

Review comment:
       nit: no need for `case`. Usually we write this as
   ```scala
   dirtyLogs.filter { ltc =>
   ...
   }
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -522,13 +523,13 @@ private[log] class Cleaner(val id: Int,
     val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
     // group the segments and clean the groups
-    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
+    info("Cleaning log %s (cleaning prior to %s, discarding legacy tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)))

Review comment:
       Might not be very clear what a "legacy tombstone" means. Would it be fair to call this an upper bound on the deletion horizon? 

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty

Review comment:
       I think we can leave off the comment about the batch being empty since we're not using this for the first timestamp anymore.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -544,17 +545,19 @@ private[log] class Cleaner(val id: Int,
    * @param log The log being cleaned
    * @param segments The group of segments being cleaned
    * @param map The offset map to use for cleaning segments
-   * @param deleteHorizonMs The time to retain delete tombstones
+   * @param currentTime The current time in milliseconds
    * @param stats Collector for cleaning statistics
    * @param transactionMetadata State of ongoing transactions which is carried between the cleaning
    *                            of the grouped segments
+   * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2
    */
   private[log] def cleanSegments(log: UnifiedLog,
                                  segments: Seq[LogSegment],
                                  map: OffsetMap,
-                                 deleteHorizonMs: Long,
+                                 currentTime: Long,
                                  stats: CleanerStats,
-                                 transactionMetadata: CleanedTransactionMetadata): Unit = {
+                                 transactionMetadata: CleanedTransactionMetadata,
+                                 legacyDeleteHorizonMs: Long = -1L): Unit = {

Review comment:
       Can we make this a required parameter? We try to avoid optional parameters because it is easy to miss them.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
-        val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+        val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
         info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
-          s"with deletion horizon $deleteHorizonMs, " +
-          s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
+          s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+          s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")

Review comment:
       This log message becomes confusing after this change. How about something like this?
   ```
   s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
       s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed from " +
       s"the segment last modified time of ${currentSegment.lastModified}"
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()
+        }
+        if (!logsWithTombstonesExpired.isEmpty) {

Review comment:
       nit: use `nonEmpty`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709510271



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,20 +673,22 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)

Review comment:
       hmm yeah I think you are right. I'll change to `canDiscardBatch && batch.isControlBatch`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699482697



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
-        val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+        val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
         info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
-          s"with deletion horizon $deleteHorizonMs, " +
-          s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
+          s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+          s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
 
         try {
-          cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
-            transactionMetadata, lastOffsetOfActiveProducers, stats)
+          val latestDeleteHorizon: Long = cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
+            log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats, currentTime = currentTime)
+          if (log.latestDeleteHorizon < latestDeleteHorizon) {

Review comment:
       The existing code uses the convention that doesn't use {} for single line statement. Could we follow the existing convention? Ditto in a few other places in this file.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                    batchMagic, true, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones;
+            boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
+            long maxOffset = iterationResult.maxOffset;
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
+                    && !batch.deleteHorizonMs().isPresent();
+                if (writeOriginalBatch && !needToSetDeleteHorizon) {
+                    if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                        filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
                     batch.writeTo(bufferOutputStream);
                     filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
                 } else {
-                    MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+                    final MemoryRecordsBuilder builder;
+                    if (needToSetDeleteHorizon) {
+                        long deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+                        if (deleteHorizonMs > filterResult.latestDeleteHorizon()) {
+                            filterResult.updateLatestDeleteHorizon(deleteHorizonMs);
+                        }
+                    } else {
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP));
+                        if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                            filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
+                    }

Review comment:
       We want to be consistent on avoiding {} for single line statements.

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -37,6 +40,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;

Review comment:
       Could we add the description for the new tombstone and control batch removal logic in the comment of the class?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                    batchMagic, true, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones;
+            boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
+            long maxOffset = iterationResult.maxOffset;
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
+                    && !batch.deleteHorizonMs().isPresent();
+                if (writeOriginalBatch && !needToSetDeleteHorizon) {
+                    if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                        filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
                     batch.writeTo(bufferOutputStream);
                     filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
                 } else {
-                    MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+                    final MemoryRecordsBuilder builder;
+                    if (needToSetDeleteHorizon) {
+                        long deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+                        if (deleteHorizonMs > filterResult.latestDeleteHorizon()) {
+                            filterResult.updateLatestDeleteHorizon(deleteHorizonMs);
+                        }
+                    } else {
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP));
+                        if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())

Review comment:
       Perhaps we could set deleteHorizonMs inside else and then call updateLatestDeleteHorizon() only once outside if/else?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
        *   2) The message doesn't has value but it can't be deleted now.
        */
       val latestOffsetForKey = record.offset() >= foundOffset
-      val isRetainedValue = record.hasValue || retainDeletes
+      val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2

Review comment:
       supportDeleteHorizon may not be very clear some delete horizon is referred to in both the old and the new logic. Perhaps we can have a boolean called legacyRecord?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       Now that we have two reason for cleaning (reflect new dirty records, remove tombstone). It would be useful to pass down the reason and log the reason in the following logging.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -374,17 +376,17 @@ class LogCleanerTest {
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1

Review comment:
       The comment above needs to be adjusted since we are no longer setting delete horizon.

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -257,7 +257,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
                  val producerStateManager: ProducerStateManager,
                  @volatile private var _topicId: Option[Uuid],
-                 val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+                 val keepPartitionMetadataFile: Boolean,
+                 @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       Since latestDeleteHorizon is never passed in from the constructor, it probably could just be a public field? Could we also add a description for this new field?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -374,17 +376,17 @@ class LogCleanerTest {
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1

Review comment:
       The comment above needs to be adjusted since we are no longer setting delete horizon.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -562,12 +569,12 @@ class LogCleanerTest {
     // Both the batch and the marker should remain after cleaning. The batch is retained
     // because it is the last entry for this producerId. The marker is retained because
     // there are still batches remaining from this transaction.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
     assertEquals(List(1), offsetsInLog(log))
     assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
 
     // The empty batch and the marker is still retained after a second cleaning.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue - 1)

Review comment:
       Is there a reason to use Long.MaxValue - 1 instead of Long.MaxValue like in other palces?

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +499,53 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(V2MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+        builder.append(5L, "0".getBytes(), "0".getBytes());
+        builder.append(10L, "1".getBytes(), null);
+        builder.append(15L, "2".getBytes(), "2".getBytes());
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        final long deleteHorizon = Integer.MAX_VALUE / 2;
+        final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+            @Override
+            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                return true;
+            }
+
+            @Override
+            protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+                return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true);

Review comment:
       It's a bit weird to return true for containsMarkerForEmptyTxn since there is no txn record in this test. It seems the test can work even with containsMarkerForEmptyTxn set to false since it has a tombstone, which will trigger the setting of deleteHorizon?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -1854,6 +1863,18 @@ class LogCleanerTest {
   private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
     LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
   }
+
+  /**
+   * We need to run a two pass clean to perform the following steps to stimulate a proper clean:
+   *  1. On the first run, set the delete horizon in the batches with tombstone or markers with empty txn records.
+   *  2. For the second pass, we will advance the current time by tombstoneRetentionMs, which will cause the
+   *     tombstones to expire, leading to their prompt removal from the log.

Review comment:
       Could we add a description for the return value?

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -21,8 +21,11 @@
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;

Review comment:
       unused import.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
##########
@@ -211,6 +212,12 @@
      */
     boolean isTransactional();
 
+    /**
+     * Get the delete horizon, returns None if the first timestamp is not the delete horizon

Review comment:
       We are returning empty, not None.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -246,6 +265,19 @@ public boolean isTransactional() {
         return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
     }
 
+    @Override
+    public boolean hasDeleteHorizonMs() {
+        return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+    }
+
+    @Override
+    public long deleteHorizonMs() {

Review comment:
       I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs` to see how it looks. I think that the current usage in the cases for which this value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, so I am thinking it's just cleaner to keep it as a `long` and contain the logic for the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       ~~hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing 
   
   edit: going through the build errors and I see other usages. I will go through it more thoroughly~~
   
   sorry I jumped the gun a bit w/ the earlier changes
   
   There is an addition to the LogCleanerManager that allows the cleaner to check for cleaning logs that have tombstones past the deleteHorizon. The logic in the LogCleanerManager can be paraphrased to "if there are no eligible cleanable logs, we can see if there are logs that have tombstones that can be deleted by checking the Log's latestDeleteHorizon. We can enqueue those with tombstones eligible for cleaning"

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       ~~hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing~~
   
   ~~edit: going through the build errors and I see other usages. I will go through it more thoroughly~~
   
   sorry I jumped the gun a bit w/ the earlier changes
   
   There is an addition to the LogCleanerManager that allows the cleaner to check for cleaning logs that have tombstones past the deleteHorizon. The logic in the LogCleanerManager can be paraphrased to "if there are no eligible cleanable logs, we can see if there are logs that have tombstones that can be deleted by checking the Log's latestDeleteHorizon. We can enqueue those with tombstones eligible for cleaning"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668303344



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -167,21 +167,9 @@ public void ensureValid() {
      *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
      */
     public long baseTimestamp() {
-        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
-    }
-
-    /**
-     * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     * 
-     * @return The first timestamp if a record has been appended, unless the delete horizon has been set
-     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set
-     */
-    public long firstTimestamp() {
-        final long baseTimestamp = baseTimestamp();
         if (hasDeleteHorizonMs())
             return RecordBatch.NO_TIMESTAMP;

Review comment:
       yes you are right. sorry for the mistake. It should be fixed now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r706478220



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       @junrao @hachikuji @lbradstreet I've removed the logic for tracking the latestDeleteHorizon and the deleteHorizon-triggered cleaning in grabFilthiestCompactedLog since this part of the PR is not a part of KIP-534




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r705721022



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -163,17 +168,18 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * Choose the log to clean next and add it to the in-progress set. We recompute this
     * each time from the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
+    * Returns a tuple of an Option of the log selected to be cleaned and the reason it was selected.
     */
-  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): (Option[LogToClean], LogCleaningReason) = {

Review comment:
       I've moved this into the LogToClean class as a var in the helper. We now set this var in `grabFilthiestCompactedLog` and I've expanded on some of the current testing in `LogCleanerManagerTest` to check that the reason is correct when using `grabFilthiestCompactedLog`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709536010



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int,
    * @return The first offset not cleaned and the statistics for this round of cleaning
    */
   private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+    doClean(cleanable, time.milliseconds())
+  }
+
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = {
+    info("Beginning cleaning of log %s".format(cleanable.log.name))
+
     // figure out the timestamp below which it is safe to remove delete tombstones
     // this position is defined to be a configurable time beneath the last modified time of the last clean segment
-    val deleteHorizonMs =
+    // this timestamp is only used on the older message formats newer than MAGIC_VALUE_V2

Review comment:
       newer => older ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r681961452



##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
##########
@@ -80,6 +80,7 @@ public void write(int b) {
     private int numRecords = 0;
     private float actualCompressionRatio = 1;
     private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
+    private long deleteHorizonMs;

Review comment:
       Can we rename `firstTimestamp` to `baseTimestamp` here as well?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
       Can we document the return type?

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+            builder.append(10L, "1".getBytes(), null);
+
+            ByteBuffer filtered = ByteBuffer.allocate(2048);
+            final long deleteHorizon = Integer.MAX_VALUE / 2;
+            final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {

Review comment:
       I think this test could be a little simpler. Rather than going through `filterTo`, we can just use `MemoryRecordsBuilder` directly setting the delete horizon. Maybe it is useful to have both. Perhaps we could add a more direct  test in `MemoryRecordsBuilderTest` or something like that?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP
+
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+          if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) {

Review comment:
       This check doesn't make sense since control records only exist for v2.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                                                                          batchMagic, true, retainedRecords);

Review comment:
       nit: fix alignment

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+            builder.append(10L, "1".getBytes(), null);

Review comment:
       Maybe we can add a few more records here to make the test more interesting

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)

Review comment:
       nit: since we are filtering magic < 2 below, maybe we could add another provider

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       Can you help me understand why we need to track this here?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
##########
@@ -125,6 +127,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
         this.baseSequence = baseSequence;
         this.isTransactional = isTransactional;
         this.isControlBatch = isControlBatch;
+        this.deleteHorizonMs = deleteHorizonMs;

Review comment:
       Should we validate that no delete horizon has been set if the magic is less than 2?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -239,9 +247,68 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
         return filterResult;
     }
 
+    private static BatchFilterResult filterBatch(RecordBatch batch,
+                                                 BufferSupplier decompressionBufferSupplier,
+                                                 FilterResult filterResult,
+                                                 RecordFilter filter,
+                                                 byte batchMagic,
+                                                 boolean writeOriginalBatch,
+                                                 List<Record> retainedRecords) {
+        long maxOffset = -1;
+        boolean containsTombstones = false;
+        try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
+            while (iterator.hasNext()) {
+                Record record = iterator.next();
+                filterResult.messagesRead += 1;
+
+                if (filter.shouldRetainRecord(batch, record)) {
+                    // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+                    // the corrupted batch with correct data.
+                    if (!record.hasMagic(batchMagic))
+                        writeOriginalBatch = false;
+
+                    if (record.offset() > maxOffset)
+                        maxOffset = record.offset();
+
+                    retainedRecords.add(record);
+
+                    if (!record.hasValue()) {
+                        containsTombstones = true;
+                    }
+                } else {
+                    writeOriginalBatch = false;
+                }
+            }
+            return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset);
+        }
+    }
+
+    private static class BatchFilterResult {
+        private final boolean writeOriginalBatch;
+        private final boolean containsTombstones;
+        private final long maxOffset;
+        public BatchFilterResult(final boolean writeOriginalBatch,
+                                 final boolean containsTombstones,
+                                 final long maxOffset) {
+            this.writeOriginalBatch = writeOriginalBatch;
+            this.containsTombstones = containsTombstones;
+            this.maxOffset = maxOffset;
+        }
+        public boolean shouldWriteOriginalBatch() {

Review comment:
       nit: since this is a private class anyway, maybe we can leave off these accessors

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+            builder.append(10L, "1".getBytes(), null);
+
+            ByteBuffer filtered = ByteBuffer.allocate(2048);
+            final long deleteHorizon = Integer.MAX_VALUE / 2;
+            final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+                @Override
+                protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                    return true;
+                }
+
+                @Override
+                protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+                    return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true);
+                }
+            };
+            builder.build().filterTo(new TopicPartition("random", 0), recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+            filtered.flip();
+            MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+            List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+            assertEquals(1, batches.size());
+            assertEquals(deleteHorizon, batches.get(0).deleteHorizonMs().getAsLong());

Review comment:
       nit: how about using:
   ```java
   assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r702508503



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       It seems like whether we track the delete horizon or the # of tombstones we will need to checkpoint some state. Otherwise we will be forced to perform a pass after every broker restart. Could we track the delete horizon upon each log append, when we clean the log, and when we have to recover the log?
   
   I'm not sure where a checkpoint should be stored given our current checkpoint file formats and the need to support downgrades.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668300841



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +161,27 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     */
+    public long baseTimestamp() {

Review comment:
       I've updated RecordIterator. I also expanded on `MemoryRecordsTest.testBaseTimestampToDeleteHorizonConversion` to check the record timestamp to verify it is the correct value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
       ~~It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm going to remove it~~
   
   ~~edit: going through the build errors and I see other usages. I will go through it more thoroughly~~

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       ~~hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing~~
   
   ~~edit: going through the build errors and I see other usages. I will go through it more thoroughly~~

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       sorry I jumped the gun a bit w/ the earlier changes
   
   There is an addition to the LogCleanerManager that allows the cleaner to check for cleaning logs that have tombstones past the deleteHorizon. The logic in the LogCleanerManager can be paraphrased to "if there are no eligible cleanable logs, we can see if there are logs that have tombstones that can be deleted by checking the Log's latestDeleteHorizon. We can enqueue those with tombstones eligible for cleaning"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679386530



##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -239,9 +249,68 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
         return filterResult;
     }
 
+    private static BatchFilterResult filterBatch(RecordBatch batch,
+                                                 BufferSupplier decompressionBufferSupplier,
+                                                 FilterResult filterResult,
+                                                 RecordFilter filter,
+                                                 byte batchMagic,
+                                                 boolean writeOriginalBatch,
+                                                 long maxOffset,

Review comment:
       thanks for the catch! I've moved it out of the input parameters to clean it up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vincent81jiang commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
vincent81jiang commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679343797



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP
+
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+          if (batch.magic() < 2) {

Review comment:
       RecordBatch.MAGIC_VALUE_V2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vincent81jiang commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
vincent81jiang commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679342991



##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +179,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                                                                          batchMagic, true, maxOffset, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones();
+            boolean writeOriginalBatch = iterationResult.shouldWriteOriginalBatch();
+            maxOffset = iterationResult.maxOffset();
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= 2 && (containsTombstones || containsMarkerForEmptyTxn)

Review comment:
       RecordBatch.MAGIC_VALUE_V2?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
       It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm going to remove it
   
   edit: going through the build errors and I see other usages. I will go through it more thoroughly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
       It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm removing it

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
       It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm going to remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r666406571



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +161,27 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     */
+    public long baseTimestamp() {
+        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
+    }
+
+    /**
+     * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
      * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * 
+     * @return The first timestamp if a record has been appended, unless the delete horizon has been set
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set
      */
     public long firstTimestamp() {

Review comment:
       I've removed the firstTimestamp() method, which included some modifications to the baseTimestamp() method and those test changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #10914:
URL: https://github.com/apache/kafka/pull/10914


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r665735408



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +161,27 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     */
+    public long baseTimestamp() {

Review comment:
       I think we need to update `DefaultRecordBatch.RecordIterator` to use `baseTimestamp()` instead of `firstTimestamp()`. We should also make sure we have a test case to ensure that the record timestamps remain correct even when the delete horizon has been set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing 
   
   edit: going through the build errors and I see other usages. I will go through it more thoroughly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r702503828



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,13 +204,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+

Review comment:
       Preexisting nit issue: whitespace between `if` and `(` in `if(cleanableLogs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709539669



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int,
    * @return The first offset not cleaned and the statistics for this round of cleaning
    */
   private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+    doClean(cleanable, time.milliseconds())
+  }
+
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = {
+    info("Beginning cleaning of log %s".format(cleanable.log.name))
+
     // figure out the timestamp below which it is safe to remove delete tombstones
     // this position is defined to be a configurable time beneath the last modified time of the last clean segment
-    val deleteHorizonMs =
+    // this timestamp is only used on the older message formats newer than MAGIC_VALUE_V2

Review comment:
       ah thanks for the catch :/ I've been getting mixed up in my head




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699482697



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
-        val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+        val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
         info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
-          s"with deletion horizon $deleteHorizonMs, " +
-          s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
+          s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+          s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
 
         try {
-          cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
-            transactionMetadata, lastOffsetOfActiveProducers, stats)
+          val latestDeleteHorizon: Long = cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
+            log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats, currentTime = currentTime)
+          if (log.latestDeleteHorizon < latestDeleteHorizon) {

Review comment:
       The existing code uses the convention that doesn't use {} for single line statement. Could we follow the existing convention? Ditto in a few other places in this file.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                    batchMagic, true, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones;
+            boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
+            long maxOffset = iterationResult.maxOffset;
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
+                    && !batch.deleteHorizonMs().isPresent();
+                if (writeOriginalBatch && !needToSetDeleteHorizon) {
+                    if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                        filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
                     batch.writeTo(bufferOutputStream);
                     filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
                 } else {
-                    MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+                    final MemoryRecordsBuilder builder;
+                    if (needToSetDeleteHorizon) {
+                        long deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+                        if (deleteHorizonMs > filterResult.latestDeleteHorizon()) {
+                            filterResult.updateLatestDeleteHorizon(deleteHorizonMs);
+                        }
+                    } else {
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP));
+                        if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                            filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
+                    }

Review comment:
       We want to be consistent on avoiding {} for single line statements.

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -37,6 +40,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;

Review comment:
       Could we add the description for the new tombstone and control batch removal logic in the comment of the class?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                    batchMagic, true, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones;
+            boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
+            long maxOffset = iterationResult.maxOffset;
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
+                    && !batch.deleteHorizonMs().isPresent();
+                if (writeOriginalBatch && !needToSetDeleteHorizon) {
+                    if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                        filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
                     batch.writeTo(bufferOutputStream);
                     filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
                 } else {
-                    MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+                    final MemoryRecordsBuilder builder;
+                    if (needToSetDeleteHorizon) {
+                        long deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+                        if (deleteHorizonMs > filterResult.latestDeleteHorizon()) {
+                            filterResult.updateLatestDeleteHorizon(deleteHorizonMs);
+                        }
+                    } else {
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP));
+                        if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())

Review comment:
       Perhaps we could set deleteHorizonMs inside else and then call updateLatestDeleteHorizon() only once outside if/else?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
        *   2) The message doesn't has value but it can't be deleted now.
        */
       val latestOffsetForKey = record.offset() >= foundOffset
-      val isRetainedValue = record.hasValue || retainDeletes
+      val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2

Review comment:
       supportDeleteHorizon may not be very clear some delete horizon is referred to in both the old and the new logic. Perhaps we can have a boolean called legacyRecord?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       Now that we have two reason for cleaning (reflect new dirty records, remove tombstone). It would be useful to pass down the reason and log the reason in the following logging.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -374,17 +376,17 @@ class LogCleanerTest {
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1

Review comment:
       The comment above needs to be adjusted since we are no longer setting delete horizon.

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -257,7 +257,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
                  val producerStateManager: ProducerStateManager,
                  @volatile private var _topicId: Option[Uuid],
-                 val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+                 val keepPartitionMetadataFile: Boolean,
+                 @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       Since latestDeleteHorizon is never passed in from the constructor, it probably could just be a public field? Could we also add a description for this new field?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -374,17 +376,17 @@ class LogCleanerTest {
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1

Review comment:
       The comment above needs to be adjusted since we are no longer setting delete horizon.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -562,12 +569,12 @@ class LogCleanerTest {
     // Both the batch and the marker should remain after cleaning. The batch is retained
     // because it is the last entry for this producerId. The marker is retained because
     // there are still batches remaining from this transaction.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
     assertEquals(List(1), offsetsInLog(log))
     assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
 
     // The empty batch and the marker is still retained after a second cleaning.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue - 1)

Review comment:
       Is there a reason to use Long.MaxValue - 1 instead of Long.MaxValue like in other palces?

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +499,53 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(V2MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+        builder.append(5L, "0".getBytes(), "0".getBytes());
+        builder.append(10L, "1".getBytes(), null);
+        builder.append(15L, "2".getBytes(), "2".getBytes());
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        final long deleteHorizon = Integer.MAX_VALUE / 2;
+        final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+            @Override
+            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                return true;
+            }
+
+            @Override
+            protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+                return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true);

Review comment:
       It's a bit weird to return true for containsMarkerForEmptyTxn since there is no txn record in this test. It seems the test can work even with containsMarkerForEmptyTxn set to false since it has a tombstone, which will trigger the setting of deleteHorizon?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -1854,6 +1863,18 @@ class LogCleanerTest {
   private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
     LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
   }
+
+  /**
+   * We need to run a two pass clean to perform the following steps to stimulate a proper clean:
+   *  1. On the first run, set the delete horizon in the batches with tombstone or markers with empty txn records.
+   *  2. For the second pass, we will advance the current time by tombstoneRetentionMs, which will cause the
+   *     tombstones to expire, leading to their prompt removal from the log.

Review comment:
       Could we add a description for the return value?

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -21,8 +21,11 @@
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;

Review comment:
       unused import.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
##########
@@ -211,6 +212,12 @@
      */
     boolean isTransactional();
 
+    /**
+     * Get the delete horizon, returns None if the first timestamp is not the delete horizon

Review comment:
       We are returning empty, not None.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r700658878



##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -562,12 +569,12 @@ class LogCleanerTest {
     // Both the batch and the marker should remain after cleaning. The batch is retained
     // because it is the last entry for this producerId. The marker is retained because
     // there are still batches remaining from this transaction.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
     assertEquals(List(1), offsetsInLog(log))
     assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
 
     // The empty batch and the marker is still retained after a second cleaning.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue - 1)

Review comment:
       must have been a typo. I will change it to Long.MaxValue to be similar to other uses




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682012434



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP
+
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+          if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) {

Review comment:
       ah right, I didn't catch this. Seems like we don't need this block then, and we can just move into this check if it's a Control Batch then
   ```
   discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#issuecomment-865452418


   @junrao @hachikuji Could you help take a review pass? I know Jun has reviewed before, but since we've rebased several times I think it would be helpful to look over again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r700663048



##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +499,53 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(V2MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+        builder.append(5L, "0".getBytes(), "0".getBytes());
+        builder.append(10L, "1".getBytes(), null);
+        builder.append(15L, "2".getBytes(), "2".getBytes());
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        final long deleteHorizon = Integer.MAX_VALUE / 2;
+        final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+            @Override
+            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                return true;
+            }
+
+            @Override
+            protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+                return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true);

Review comment:
       makes sense, I can change this to return `false`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699714060



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.

Review comment:
       nit: the base timestamp is used to calculate the record timestamps from the deltas

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       Do we need `legacyDeleteHorizonMs` as a parameter? As far as I can tell, there are no cases in the tests which override it. Maybe we could just compute it here instead of in `clean`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,38 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
+   *
+   * @return the latestDeleteHorizon that is found from the FilterResult of the cleaning
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+            discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime

Review comment:
       nit: misaligned

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,23 +676,28 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)
       }
 
       override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
+        var isRecordRetained: Boolean = true

Review comment:
       Why do we need this `var`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
        *   2) The message doesn't has value but it can't be deleted now.
        */
       val latestOffsetForKey = record.offset() >= foundOffset
-      val isRetainedValue = record.hasValue || retainDeletes
+      val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2
+      val shouldRetainDeletes =

Review comment:
       nit: maybe turn this into a `def` since we don't even use the computed value unless `record.hasValue` is false.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       When the broker is initialized, `log.latestDeleteHorizon` will be `NO_TIMESTAMP`. We need at least one run to trigger before we can initialize the value. Is there another condition we can rely on in order to ensure that the cleaning still occurs?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 

Review comment:
       nit: no need for `case`. Usually we write this as
   ```scala
   dirtyLogs.filter { ltc =>
   ...
   }
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -522,13 +523,13 @@ private[log] class Cleaner(val id: Int,
     val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
     // group the segments and clean the groups
-    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
+    info("Cleaning log %s (cleaning prior to %s, discarding legacy tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)))

Review comment:
       Might not be very clear what a "legacy tombstone" means. Would it be fair to call this an upper bound on the deletion horizon? 

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty

Review comment:
       I think we can leave off the comment about the batch being empty since we're not using this for the first timestamp anymore.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -544,17 +545,19 @@ private[log] class Cleaner(val id: Int,
    * @param log The log being cleaned
    * @param segments The group of segments being cleaned
    * @param map The offset map to use for cleaning segments
-   * @param deleteHorizonMs The time to retain delete tombstones
+   * @param currentTime The current time in milliseconds
    * @param stats Collector for cleaning statistics
    * @param transactionMetadata State of ongoing transactions which is carried between the cleaning
    *                            of the grouped segments
+   * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2
    */
   private[log] def cleanSegments(log: UnifiedLog,
                                  segments: Seq[LogSegment],
                                  map: OffsetMap,
-                                 deleteHorizonMs: Long,
+                                 currentTime: Long,
                                  stats: CleanerStats,
-                                 transactionMetadata: CleanedTransactionMetadata): Unit = {
+                                 transactionMetadata: CleanedTransactionMetadata,
+                                 legacyDeleteHorizonMs: Long = -1L): Unit = {

Review comment:
       Can we make this a required parameter? We try to avoid optional parameters because it is easy to miss them.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
-        val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+        val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
         info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
-          s"with deletion horizon $deleteHorizonMs, " +
-          s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
+          s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+          s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")

Review comment:
       This log message becomes confusing after this change. How about something like this?
   ```
   s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
       s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed from " +
       s"the segment last modified time of ${currentSegment.lastModified}"
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove tombstones if we can
+            // under the condition that the log's latest delete horizon is less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds()
+        }
+        if (!logsWithTombstonesExpired.isEmpty) {

Review comment:
       nit: use `nonEmpty`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r665730032



##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -90,11 +90,15 @@
  * by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
  * the previous value prior to becoming empty.
  *
+ * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
+ * the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is

Review comment:
       nit: needs -> need?

##########
File path: checkstyle/checkstyle.xml
##########
@@ -133,7 +133,7 @@
     </module>
     <module name="CyclomaticComplexity">
       <!-- default is 10-->
-      <property name="max" value="16"/>
+      <property name="max" value="18"/>

Review comment:
       nit: usually we prefer to add exclusions rather than change the limit for everything

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +161,27 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     */
+    public long baseTimestamp() {
+        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
+    }
+
+    /**
+     * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
      * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     * 
+     * @return The first timestamp if a record has been appended, unless the delete horizon has been set
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set
      */
     public long firstTimestamp() {

Review comment:
       As far as I can tell, the only use of this method is in tests (after we fix `DefaultRecordBatch.RecordIterator`). Maybe we can remove it or give it default access? Otherwise, the implementation is a little dangerous because it does not accurately return the first timestamp in all cases.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +161,27 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
+     * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     */
+    public long baseTimestamp() {

Review comment:
       I think we need to update `DefaultRecordBatch.RecordIterator` to use `baseTimestamp()` instead of `firstTimestamp()`. We should also make sure we have a test case to ensure that the record timestamps remain value even when the delete horizon has been set.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -246,6 +265,19 @@ public boolean isTransactional() {
         return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
     }
 
+    @Override
+    public boolean hasDeleteHorizonMs() {
+        return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+    }
+
+    @Override
+    public long deleteHorizonMs() {

Review comment:
       Maybe we could return `OptionalLong` and get rid of `hasDeleteHorizonMs`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r700684311



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       I've updated the code to record the reason why we chose a log in `LogCleanerManager.grabFilthiestCompactedLog` and passed the reason through to `cleanLog` -> `clean` -> `doClean` in the LogCleaner code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

Posted by GitBox <gi...@apache.org>.
mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r705721987



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,23 +676,28 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)
       }
 
       override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
+        var isRecordRetained: Boolean = true

Review comment:
       unsure now, I've removed it and we just return the value directly now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org