You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/03/15 20:23:28 UTC

[kafka] branch 2.8 updated: KAFKA-13727; Preserve txn markers after partial segment cleaning (#11891)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 18cb82a  KAFKA-13727; Preserve txn markers after partial segment cleaning (#11891)
18cb82a is described below

commit 18cb82a3d50064252b5b036d8f7f70635bcacc74
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Mar 15 12:26:23 2022 -0700

    KAFKA-13727; Preserve txn markers after partial segment cleaning (#11891)
    
    It is possible to clean a segment partially if the offset map is filled before reaching the end of the segment. The highest offset that is reached becomes the new dirty offset after the cleaning completes. The data above this offset is nevertheless copied over to the new partially cleaned segment. Hence we need to ensure that the transaction index reflects aborted transactions from both the cleaned and uncleaned portion of the segment. Prior to this patch, this was not the case. We on [...]
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../kafka/common/record/RecordBatchIterator.java   |   4 +
 core/src/main/scala/kafka/log/LogCleaner.scala     |   4 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 109 +++++++++++++++++++++
 3 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
index 88af039..967cff8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.AbstractIterator;
 
+import java.io.EOFException;
 import java.io.IOException;
 
 class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {
@@ -36,6 +38,8 @@ class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {
             if (batch == null)
                 return allDone();
             return batch;
+        } catch (EOFException e) {
+            throw new CorruptRecordException("Unexpected EOF while attempting to read the next batch", e);
         } catch (IOException e) {
             throw new KafkaException(e);
         }
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index df9722c..7a8a13c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -576,8 +576,10 @@ private[log] class Cleaner(val id: Int,
         val currentSegment = currentSegmentOpt.get
         val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
 
+        // Note that it is important to collect aborted transactions from the full log segment
+        // range since we need to rebuild the full transaction index for the new segment.
         val startOffset = currentSegment.baseOffset
-        val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
+        val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(currentSegment.readNextOffset)
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index dc51ce6..43bc3b9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -248,6 +248,110 @@ class LogCleanerTest {
     assertEquals(2L, logAppendInfo.lastOffset)
   }
 
+  private def assertAllAbortedTxns(
+    expectedAbortedTxns: List[AbortedTxn],
+    log: Log
+  ): Unit= {
+    val abortedTxns = log.collectAbortedTransactions(startOffset = 0L, upperBoundOffset = log.logEndOffset)
+    assertEquals(expectedAbortedTxns, abortedTxns)
+  }
+
+  private def assertAllTransactionsComplete(log: Log): Unit = {
+    assertTrue(log.activeProducers.forall(_.currentTxnStartOffset() == -1))
+  }
+
+  @Test
+  def testMultiPassSegmentCleaningWithAbortedTransactions(): Unit = {
+    // Verify that the log cleaner preserves aborted transaction state (including the index)
+    // even if the cleaner cannot clean the whole segment in one pass.
+
+    val deleteRetentionMs = 50000
+    val offsetMapSlots = 4
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.DeleteRetentionMsProp, deleteRetentionMs.toString)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+    val producerId1 = 1
+    val producerId2 = 2
+
+    val appendProducer1 = appendTransactionalAsLeader(log, producerId1, producerEpoch)
+    val appendProducer2 = appendTransactionalAsLeader(log, producerId2, producerEpoch)
+
+    def abort(producerId: Long): Unit = {
+      log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Replication)
+    }
+
+    def commit(producerId: Long): Unit = {
+      log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Replication)
+    }
+
+    // Append some transaction data (offset range in parenthesis)
+    appendProducer1(Seq(1, 2))  // [0, 1]
+    appendProducer2(Seq(2, 3))  // [2, 3]
+    appendProducer1(Seq(3, 4))  // [4, 5]
+    commit(producerId1)         // [6, 6]
+    commit(producerId2)         // [7, 7]
+    appendProducer1(Seq(2, 3))  // [8, 9]
+    abort(producerId1)          // [10, 10]
+    appendProducer2(Seq(4, 5))  // [11, 12]
+    appendProducer1(Seq(5, 6))  // [13, 14]
+    commit(producerId1)         // [15, 15]
+    abort(producerId2)          // [16, 16]
+    appendProducer2(Seq(6, 7))  // [17, 18]
+    commit(producerId2)         // [19, 19]
+
+    log.roll()
+    assertEquals(20L, log.logEndOffset)
+
+    val expectedAbortedTxns = List(
+      new AbortedTxn(producerId=producerId1, firstOffset=8, lastOffset=10, lastStableOffset=11),
+      new AbortedTxn(producerId=producerId2, firstOffset=11, lastOffset=16, lastStableOffset=17)
+    )
+
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(expectedAbortedTxns, log)
+
+    var dirtyOffset = 0L
+    def cleanSegments(deleteHorizonMs: Long): Unit = {
+      val offsetMap = new FakeOffsetMap(slots = offsetMapSlots)
+      val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
+      val stats = new CleanerStats(time)
+      cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset, offsetMap, stats)
+      cleaner.cleanSegments(log, segments, offsetMap, deleteHorizonMs, stats, new CleanedTransactionMetadata)
+      dirtyOffset = offsetMap.latestOffset + 1
+    }
+
+    // On the first pass, we should see the data from the aborted transactions deleted,
+    // but the markers should remain until the deletion retention time has passed.
+    cleanSegments(deleteHorizonMs = 0L)
+    assertEquals(4L, dirtyOffset)
+    assertEquals(List(0, 2, 4, 6, 7, 10, 13, 15, 16, 17, 19), batchBaseOffsetsInLog(log))
+    assertEquals(List(0, 2, 3, 4, 5, 6, 7, 10, 13, 14, 15, 16, 17, 18, 19), offsetsInLog(log))
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(expectedAbortedTxns, log)
+
+    // On the second pass, no data from the aborted transactions remains. The markers
+    // still cannot be removed from the log due to the retention time, but we do not
+    // need to record them in the transaction index since they are empty.
+    cleanSegments(deleteHorizonMs = 0L)
+    assertEquals(14, dirtyOffset)
+    assertEquals(List(0, 2, 4, 6, 7, 10, 13, 15, 16, 17, 19), batchBaseOffsetsInLog(log))
+    assertEquals(List(0, 2, 4, 5, 6, 7, 10, 13, 14, 15, 16, 17, 18, 19), offsetsInLog(log))
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(List(), log)
+
+    // On the last pass, wait for the retention time to expire. The abort markers
+    // (offsets 10 and 16) should be deleted.
+    cleanSegments(deleteHorizonMs = Long.MaxValue)
+    assertEquals(20L, dirtyOffset)
+    assertEquals(List(0, 2, 4, 6, 7, 13, 15, 17, 19), batchBaseOffsetsInLog(log))
+    assertEquals(List(0, 2, 4, 5, 6, 7, 13, 15, 17, 18, 19), offsetsInLog(log))
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(List(), log)
+  }
+
   @Test
   def testBasicTransactionAwareCleaning(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
@@ -1052,6 +1156,11 @@ class LogCleanerTest {
     assertEquals(numInvalidMessages, stats.invalidMessagesRead, "Cleaner should have seen %d invalid messages.")
   }
 
+  private def batchBaseOffsetsInLog(log: Log): Iterable[Long] = {
+    for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
+      yield batch.baseOffset
+  }
+
   def lastOffsetsPerBatchInLog(log: Log): Iterable[Long] = {
     for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
       yield batch.lastOffset