You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/01 09:11:21 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699482697



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
-        val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+        val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
         info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
-          s"with deletion horizon $deleteHorizonMs, " +
-          s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
+          s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+          s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
 
         try {
-          cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
-            transactionMetadata, lastOffsetOfActiveProducers, stats)
+          val latestDeleteHorizon: Long = cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
+            log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats, currentTime = currentTime)
+          if (log.latestDeleteHorizon < latestDeleteHorizon) {

Review comment:
       The existing code uses the convention that doesn't use {} for single line statement. Could we follow the existing convention? Ditto in a few other places in this file.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                    batchMagic, true, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones;
+            boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
+            long maxOffset = iterationResult.maxOffset;
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
+                    && !batch.deleteHorizonMs().isPresent();
+                if (writeOriginalBatch && !needToSetDeleteHorizon) {
+                    if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                        filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
                     batch.writeTo(bufferOutputStream);
                     filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
                 } else {
-                    MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+                    final MemoryRecordsBuilder builder;
+                    if (needToSetDeleteHorizon) {
+                        long deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+                        if (deleteHorizonMs > filterResult.latestDeleteHorizon()) {
+                            filterResult.updateLatestDeleteHorizon(deleteHorizonMs);
+                        }
+                    } else {
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP));
+                        if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                            filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
+                    }

Review comment:
       We want to be consistent on avoiding {} for single line statements.

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -37,6 +40,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;

Review comment:
       Could we add the description for the new tombstone and control batch removal logic in the comment of the class?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                    batchMagic, true, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones;
+            boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
+            long maxOffset = iterationResult.maxOffset;
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
+                    && !batch.deleteHorizonMs().isPresent();
+                if (writeOriginalBatch && !needToSetDeleteHorizon) {
+                    if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())
+                        filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs().getAsLong());
                     batch.writeTo(bufferOutputStream);
                     filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
                 } else {
-                    MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+                    final MemoryRecordsBuilder builder;
+                    if (needToSetDeleteHorizon) {
+                        long deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+                        if (deleteHorizonMs > filterResult.latestDeleteHorizon()) {
+                            filterResult.updateLatestDeleteHorizon(deleteHorizonMs);
+                        }
+                    } else {
+                        builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP));
+                        if (batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP) > filterResult.latestDeleteHorizon())

Review comment:
       Perhaps we could set deleteHorizonMs inside else and then call updateLatestDeleteHorizon() only once outside if/else?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
        *   2) The message doesn't has value but it can't be deleted now.
        */
       val latestOffsetForKey = record.offset() >= foundOffset
-      val isRetainedValue = record.hasValue || retainDeletes
+      val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2

Review comment:
       supportDeleteHorizon may not be very clear some delete horizon is referred to in both the old and the new logic. Perhaps we can have a boolean called legacyRecord?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       Now that we have two reason for cleaning (reflect new dirty records, remove tombstone). It would be useful to pass down the reason and log the reason in the following logging.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -374,17 +376,17 @@ class LogCleanerTest {
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1

Review comment:
       The comment above needs to be adjusted since we are no longer setting delete horizon.

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -257,7 +257,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
                  val producerStateManager: ProducerStateManager,
                  @volatile private var _topicId: Option[Uuid],
-                 val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+                 val keepPartitionMetadataFile: Boolean,
+                 @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup {

Review comment:
       Since latestDeleteHorizon is never passed in from the constructor, it probably could just be a public field? Could we also add a description for this new field?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -374,17 +376,17 @@ class LogCleanerTest {
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
     assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
-    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1

Review comment:
       The comment above needs to be adjusted since we are no longer setting delete horizon.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -562,12 +569,12 @@ class LogCleanerTest {
     // Both the batch and the marker should remain after cleaning. The batch is retained
     // because it is the last entry for this producerId. The marker is retained because
     // there are still batches remaining from this transaction.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
     assertEquals(List(1), offsetsInLog(log))
     assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
 
     // The empty batch and the marker is still retained after a second cleaning.
-    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+    cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue - 1)

Review comment:
       Is there a reason to use Long.MaxValue - 1 instead of Long.MaxValue like in other palces?

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +499,53 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been successfully
+     * converted to a delete horizon for the tombstones / transaction markers of the batch.
+     * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(V2MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+                0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+        builder.append(5L, "0".getBytes(), "0".getBytes());
+        builder.append(10L, "1".getBytes(), null);
+        builder.append(15L, "2".getBytes(), "2".getBytes());
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        final long deleteHorizon = Integer.MAX_VALUE / 2;
+        final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+            @Override
+            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                return true;
+            }
+
+            @Override
+            protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+                return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true);

Review comment:
       It's a bit weird to return true for containsMarkerForEmptyTxn since there is no txn record in this test. It seems the test can work even with containsMarkerForEmptyTxn set to false since it has a tombstone, which will trigger the setting of deleteHorizon?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -1854,6 +1863,18 @@ class LogCleanerTest {
   private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
     LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
   }
+
+  /**
+   * We need to run a two pass clean to perform the following steps to stimulate a proper clean:
+   *  1. On the first run, set the delete horizon in the batches with tombstone or markers with empty txn records.
+   *  2. For the second pass, we will advance the current time by tombstoneRetentionMs, which will cause the
+   *     tombstones to expire, leading to their prompt removal from the log.

Review comment:
       Could we add a description for the return value?

##########
File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -21,8 +21,11 @@
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;

Review comment:
       unused import.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
##########
@@ -211,6 +212,12 @@
      */
     boolean isTransactional();
 
+    /**
+     * Get the delete horizon, returns None if the first timestamp is not the delete horizon

Review comment:
       We are returning empty, not None.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org