You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/29 17:18:28 UTC

[GitHub] [kafka] vincent81jiang commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

vincent81jiang commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679342991



##########
File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +179,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+                                                                          batchMagic, true, maxOffset, retainedRecords);
+            boolean containsTombstones = iterationResult.containsTombstones();
+            boolean writeOriginalBatch = iterationResult.shouldWriteOriginalBatch();
+            maxOffset = iterationResult.maxOffset();
 
             if (!retainedRecords.isEmpty()) {
-                if (writeOriginalBatch) {
+                // we check if the delete horizon should be set to a new value
+                // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+                // if the batch does not contain tombstones, then we don't need to overwrite batch
+                boolean needToSetDeleteHorizon = batch.magic() >= 2 && (containsTombstones || containsMarkerForEmptyTxn)

Review comment:
       RecordBatch.MAGIC_VALUE_V2?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org