You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/03/15 19:36:58 UTC

[kafka] branch 3.1 updated: KAFKA-13727; Preserve txn markers after partial segment cleaning (#11891)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 648de32  KAFKA-13727; Preserve txn markers after partial segment cleaning (#11891)
648de32 is described below

commit 648de325d08e8876724168a30dc045bcce5de4bd
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Mar 15 12:26:23 2022 -0700

    KAFKA-13727; Preserve txn markers after partial segment cleaning (#11891)
    
    It is possible to clean a segment partially if the offset map is filled before reaching the end of the segment. The highest offset that is reached becomes the new dirty offset after the cleaning completes. The data above this offset is nevertheless copied over to the new partially cleaned segment. Hence we need to ensure that the transaction index reflects aborted transactions from both the cleaned and uncleaned portion of the segment. Prior to this patch, this was not the case. We on [...]
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../kafka/common/record/RecordBatchIterator.java   |   4 +
 core/src/main/scala/kafka/log/LogCleaner.scala     |   4 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 110 +++++++++++++++++++++
 3 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
index 88af039..967cff8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.AbstractIterator;
 
+import java.io.EOFException;
 import java.io.IOException;
 
 class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {
@@ -36,6 +38,8 @@ class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {
             if (batch == null)
                 return allDone();
             return batch;
+        } catch (EOFException e) {
+            throw new CorruptRecordException("Unexpected EOF while attempting to read the next batch", e);
         } catch (IOException e) {
             throw new KafkaException(e);
         }
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 0d4cab9..55d7952 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -577,8 +577,10 @@ private[log] class Cleaner(val id: Int,
         val currentSegment = currentSegmentOpt.get
         val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
 
+        // Note that it is important to collect aborted transactions from the full log segment
+        // range since we need to rebuild the full transaction index for the new segment.
         val startOffset = currentSegment.baseOffset
-        val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
+        val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(currentSegment.readNextOffset)
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 8f1d241..0b00414 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -268,6 +268,111 @@ class LogCleanerTest {
     assertEquals(2L, logAppendInfo.lastOffset)
   }
 
+  private def assertAllAbortedTxns(
+    expectedAbortedTxns: List[AbortedTxn],
+    log: UnifiedLog
+  ): Unit= {
+    val abortedTxns = log.collectAbortedTransactions(startOffset = 0L, upperBoundOffset = log.logEndOffset)
+    assertEquals(expectedAbortedTxns, abortedTxns)
+  }
+
+  private def assertAllTransactionsComplete(log: UnifiedLog): Unit = {
+    assertTrue(log.activeProducers.forall(_.currentTxnStartOffset() == -1))
+  }
+
+  @Test
+  def testMultiPassSegmentCleaningWithAbortedTransactions(): Unit = {
+    // Verify that the log cleaner preserves aborted transaction state (including the index)
+    // even if the cleaner cannot clean the whole segment in one pass.
+
+    val deleteRetentionMs = 50000
+    val offsetMapSlots = 4
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.DeleteRetentionMsProp, deleteRetentionMs.toString)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+    val producerId1 = 1
+    val producerId2 = 2
+
+    val appendProducer1 = appendTransactionalAsLeader(log, producerId1, producerEpoch)
+    val appendProducer2 = appendTransactionalAsLeader(log, producerId2, producerEpoch)
+
+    def abort(producerId: Long): Unit = {
+      log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Replication)
+    }
+
+    def commit(producerId: Long): Unit = {
+      log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Replication)
+    }
+
+    // Append some transaction data (offset range in parenthesis)
+    appendProducer1(Seq(1, 2))  // [0, 1]
+    appendProducer2(Seq(2, 3))  // [2, 3]
+    appendProducer1(Seq(3, 4))  // [4, 5]
+    commit(producerId1)         // [6, 6]
+    commit(producerId2)         // [7, 7]
+    appendProducer1(Seq(2, 3))  // [8, 9]
+    abort(producerId1)          // [10, 10]
+    appendProducer2(Seq(4, 5))  // [11, 12]
+    appendProducer1(Seq(5, 6))  // [13, 14]
+    commit(producerId1)         // [15, 15]
+    abort(producerId2)          // [16, 16]
+    appendProducer2(Seq(6, 7))  // [17, 18]
+    commit(producerId2)         // [19, 19]
+
+    log.roll()
+    assertEquals(20L, log.logEndOffset)
+
+    val expectedAbortedTxns = List(
+      new AbortedTxn(producerId=producerId1, firstOffset=8, lastOffset=10, lastStableOffset=11),
+      new AbortedTxn(producerId=producerId2, firstOffset=11, lastOffset=16, lastStableOffset=17)
+    )
+
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(expectedAbortedTxns, log)
+
+    var dirtyOffset = 0L
+    def cleanSegments(): Unit = {
+      val offsetMap = new FakeOffsetMap(slots = offsetMapSlots)
+      val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
+      val stats = new CleanerStats(time)
+      cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset, offsetMap, stats)
+      cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue)
+      dirtyOffset = offsetMap.latestOffset + 1
+    }
+
+    // On the first pass, we should see the data from the aborted transactions deleted,
+    // but the markers should remain until the deletion retention time has passed.
+    cleanSegments()
+    assertEquals(4L, dirtyOffset)
+    assertEquals(List(0, 2, 4, 6, 7, 10, 13, 15, 16, 17, 19), batchBaseOffsetsInLog(log))
+    assertEquals(List(0, 2, 3, 4, 5, 6, 7, 10, 13, 14, 15, 16, 17, 18, 19), offsetsInLog(log))
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(expectedAbortedTxns, log)
+
+    // On the second pass, no data from the aborted transactions remains. The markers
+    // still cannot be removed from the log due to the retention time, but we do not
+    // need to record them in the transaction index since they are empty.
+    cleanSegments()
+    assertEquals(14, dirtyOffset)
+    assertEquals(List(0, 2, 4, 6, 7, 10, 13, 15, 16, 17, 19), batchBaseOffsetsInLog(log))
+    assertEquals(List(0, 2, 4, 5, 6, 7, 10, 13, 14, 15, 16, 17, 18, 19), offsetsInLog(log))
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(List(), log)
+
+    // On the last pass, wait for the retention time to expire. The abort markers
+    // (offsets 10 and 16) should be deleted.
+    time.sleep(deleteRetentionMs)
+    cleanSegments()
+    assertEquals(20L, dirtyOffset)
+    assertEquals(List(0, 2, 4, 6, 7, 13, 15, 17, 19), batchBaseOffsetsInLog(log))
+    assertEquals(List(0, 2, 4, 5, 6, 7, 13, 15, 17, 18, 19), offsetsInLog(log))
+    assertAllTransactionsComplete(log)
+    assertAllAbortedTxns(List(), log)
+  }
+
   @Test
   def testBasicTransactionAwareCleaning(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
@@ -1078,6 +1183,11 @@ class LogCleanerTest {
     assertEquals(numInvalidMessages, stats.invalidMessagesRead, "Cleaner should have seen %d invalid messages.")
   }
 
+  private def batchBaseOffsetsInLog(log: UnifiedLog): Iterable[Long] = {
+    for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
+      yield batch.baseOffset
+  }
+
   def lastOffsetsPerBatchInLog(log: UnifiedLog): Iterable[Long] = {
     for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
       yield batch.lastOffset