You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/10 17:35:47 UTC

[kafka] 02/02: KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 77230b567ab51726302466058ca5f5e734e81664
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sun Jul 10 10:16:39 2022 -0700

    KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)
    
    When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the next transaction in the log. Currently control records are excluded when building the offset map, but not whe [...]
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 79 ++++++++++++++++++----
 2 files changed, 69 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 12099f0f2c1..97de1db57c2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -678,6 +678,8 @@ private[log] class Cleaner(val id: Int,
         if (discardBatchRecords)
           // The batch is only retained to preserve producer sequence information; the records can be removed
           false
+        else if (batch.isControlBatch)
+          true
         else
           Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats)
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5b942342193..253bf5490c5 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1027,6 +1027,50 @@ class LogCleanerTest {
     assertEquals(List(3, 4, 5), offsetsInLog(log))
   }
 
+
+  @Test
+  def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = {
+    val cleaner = makeCleaner(10)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val leaderEpoch = 5
+    val producerEpoch = 0.toShort
+
+    // First we append one committed transaction
+    val producerId1 = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId1, producerEpoch, leaderEpoch)
+    appendProducer(Seq(1))
+    log.appendAsLeader(commitMarker(producerId1, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+
+    // Now we append one transaction with a key which conflicts with the COMMIT marker appended above
+    def commitRecordKey(): ByteBuffer = {
+      val keySize = ControlRecordType.COMMIT.recordKey().sizeOf()
+      val key = ByteBuffer.allocate(keySize)
+      ControlRecordType.COMMIT.recordKey().writeTo(key)
+      key.flip()
+      key
+    }
+
+    val producerId2 = 2L
+    val records = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId2,
+      producerEpoch,
+      0,
+      new SimpleRecord(time.milliseconds(), commitRecordKey(), ByteBuffer.wrap("foo".getBytes))
+    )
+    log.appendAsLeader(records, leaderEpoch, origin = AppendOrigin.Client)
+    log.appendAsLeader(commitMarker(producerId2, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+    log.roll()
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+
+    // After cleaning, the marker should not be removed
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+    assertEquals(List(0, 1, 2, 3), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+  }
+
   @Test
   def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 1 message in the map
@@ -1917,20 +1961,31 @@ class LogCleanerTest {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: Log,
-                                          producerId: Long,
-                                          producerEpoch: Short,
-                                          leaderEpoch: Int = 0,
-                                          origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin)
+  private def appendTransactionalAsLeader(
+    log: Log,
+    producerId: Long,
+    producerEpoch: Short,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(
+      log,
+      producerId,
+      producerEpoch,
+      isTransactional = true,
+      leaderEpoch = leaderEpoch,
+      origin = origin
+    )
   }
 
-  private def appendIdempotentAsLeader(log: Log,
-                                       producerId: Long,
-                                       producerEpoch: Short,
-                                       isTransactional: Boolean = false,
-                                       leaderEpoch: Int = 0,
-                                       origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
+  private def appendIdempotentAsLeader(
+    log: Log,
+    producerId: Long,
+    producerEpoch: Short,
+    isTransactional: Boolean = false,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>