You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/10 17:17:02 UTC

[kafka] branch trunk updated: KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0bc8da7aec8 KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)
0bc8da7aec8 is described below

commit 0bc8da7aec8dfb7c61321b22e717a94565f4855d
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sun Jul 10 10:16:39 2022 -0700

    KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)
    
    When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the next transaction in the log. Currently control records are excluded when building the offset map, but not whe [...]
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 78 +++++++++++++++++++---
 2 files changed, 69 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bf55f10a400..4ad0ea2d853 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -701,6 +701,8 @@ private[log] class Cleaner(val id: Int,
         if (discardBatchRecords)
           // The batch is only retained to preserve producer sequence information; the records can be removed
           false
+        else if (batch.isControlBatch)
+          true
         else
           Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = currentTime)
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0dd8a2cfb04..949e0c59df5 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1030,6 +1030,50 @@ class LogCleanerTest {
     assertEquals(List(3, 4, 5), offsetsInLog(log))
   }
 
+
+  @Test
+  def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = {
+    val cleaner = makeCleaner(10)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val leaderEpoch = 5
+    val producerEpoch = 0.toShort
+
+    // First we append one committed transaction
+    val producerId1 = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId1, producerEpoch, leaderEpoch)
+    appendProducer(Seq(1))
+    log.appendAsLeader(commitMarker(producerId1, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+
+    // Now we append one transaction with a key which conflicts with the COMMIT marker appended above
+    def commitRecordKey(): ByteBuffer = {
+      val keySize = ControlRecordType.COMMIT.recordKey().sizeOf()
+      val key = ByteBuffer.allocate(keySize)
+      ControlRecordType.COMMIT.recordKey().writeTo(key)
+      key.flip()
+      key
+    }
+
+    val producerId2 = 2L
+    val records = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId2,
+      producerEpoch,
+      0,
+      new SimpleRecord(time.milliseconds(), commitRecordKey(), ByteBuffer.wrap("foo".getBytes))
+    )
+    log.appendAsLeader(records, leaderEpoch, origin = AppendOrigin.Client)
+    log.appendAsLeader(commitMarker(producerId2, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+    log.roll()
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+
+    // After cleaning, the marker should not be removed
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+    assertEquals(List(0, 1, 2, 3), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+  }
+
   @Test
   def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 1 message in the map
@@ -1962,19 +2006,31 @@ class LogCleanerTest {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: UnifiedLog,
-                                          producerId: Long,
-                                          producerEpoch: Short,
-                                          origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin)
+  private def appendTransactionalAsLeader(
+    log: UnifiedLog,
+    producerId: Long,
+    producerEpoch: Short,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(
+      log,
+      producerId,
+      producerEpoch,
+      isTransactional = true,
+      leaderEpoch = leaderEpoch,
+      origin = origin
+    )
   }
 
-  private def appendIdempotentAsLeader(log: UnifiedLog,
-                                       producerId: Long,
-                                       producerEpoch: Short,
-                                       isTransactional: Boolean = false,
-                                       leaderEpoch: Int = 0,
-                                       origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
+  private def appendIdempotentAsLeader(
+    log: UnifiedLog,
+    producerId: Long,
+    producerEpoch: Short,
+    isTransactional: Boolean = false,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>