You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/25 06:18:14 UTC

[kafka] branch 2.3 updated: KAFKA-8351; Cleaner should handle transactions spanning multiple segments (#6722)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 6243db2  KAFKA-8351; Cleaner should handle transactions spanning multiple segments (#6722)
6243db2 is described below

commit 6243db2c095a7aebb72b359ea05c18e5987ae780
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 24 23:10:56 2019 -0700

    KAFKA-8351; Cleaner should handle transactions spanning multiple segments (#6722)
    
    When cleaning transactional data, we need to keep track of which transactions still have data associated with them so that we do not remove the markers. We had logic to do this, but the state was not being carried over when beginning cleaning for a new set of segments. This could cause the cleaner to incorrectly believe a transaction marker was no longer needed. The fix here carries the transactional state between groups of segments to be cleaned.
    
    Reviewers: Dhruvil Shah <dh...@confluent.io>, Viktor Somogyi <vi...@gmail.com>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  70 ++++++------
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 117 ++++++++++++++++-----
 2 files changed, 129 insertions(+), 58 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index b972388..3180f3d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -330,7 +330,7 @@ class LogCleaner(initialConfig: CleanerConfig,
         }
         val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
         try {
-          deletable.foreach { case (topicPartition, log) =>
+          deletable.foreach { case (_, log) =>
             currentLog = Some(log)
             log.deleteOldSegments()
           }
@@ -520,8 +520,12 @@ private[log] class Cleaner(val id: Int,
 
     // group the segments and clean the groups
     info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
-    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset))
-      cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
+    val transactionMetadata = new CleanedTransactionMetadata
+
+    val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
+      log.config.maxIndexSize, cleanable.firstUncleanableOffset)
+    for (group <- groupedSegments)
+      cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
 
     // record buffer utilization
     stats.bufferUtilization = offsetMap.utilization
@@ -539,14 +543,18 @@ private[log] class Cleaner(val id: Int,
    * @param map The offset map to use for cleaning segments
    * @param deleteHorizonMs The time to retain delete tombstones
    * @param stats Collector for cleaning statistics
+   * @param transactionMetadata State of ongoing transactions which is carried between the cleaning
+   *                            of the grouped segments
    */
   private[log] def cleanSegments(log: Log,
                                  segments: Seq[LogSegment],
                                  map: OffsetMap,
                                  deleteHorizonMs: Long,
-                                 stats: CleanerStats) {
+                                 stats: CleanerStats,
+                                 transactionMetadata: CleanedTransactionMetadata): Unit = {
     // create a new segment with a suffix appended to the name of the log and indexes
     val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset)
+    transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
 
     try {
       // clean segments into the new destination segment
@@ -561,7 +569,7 @@ private[log] class Cleaner(val id: Int,
         val startOffset = currentSegment.baseOffset
         val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
-        val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex))
+        transactionMetadata.addAbortedTransactions(abortedTransactions)
 
         val retainDeletes = currentSegment.lastModified > deleteHorizonMs
         info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " +
@@ -870,8 +878,9 @@ private[log] class Cleaner(val id: Int,
     val dirty = log.logSegments(start, end).toBuffer
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
 
+    val transactionMetadata = new CleanedTransactionMetadata
     val abortedTransactions = log.collectAbortedTransactions(start, end)
-    val transactionMetadata = CleanedTransactionMetadata(abortedTransactions)
+    transactionMetadata.addAbortedTransactions(abortedTransactions)
 
     // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
@@ -1042,29 +1051,26 @@ private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDir
   override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
 }
 
-private[log] object CleanedTransactionMetadata {
-  def apply(abortedTransactions: List[AbortedTxn],
-            transactionIndex: Option[TransactionIndex] = None): CleanedTransactionMetadata = {
-    val queue = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
-      override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
-    }.reverse)
-    queue ++= abortedTransactions
-    new CleanedTransactionMetadata(queue, transactionIndex)
-  }
-
-  val Empty = CleanedTransactionMetadata(List.empty[AbortedTxn])
-}
-
 /**
- * This is a helper class to facilitate tracking transaction state while cleaning the log. It is initialized
- * with the aborted transactions from the transaction index and its state is updated as the cleaner iterates through
- * the log during a round of cleaning. This class is responsible for deciding when transaction markers can
- * be removed and is therefore also responsible for updating the cleaned transaction index accordingly.
+ * This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set
+ * of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This
+ * class is responsible for deciding when transaction markers can be removed and is therefore also responsible
+ * for updating the cleaned transaction index accordingly.
  */
-private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.PriorityQueue[AbortedTxn],
-                                              val transactionIndex: Option[TransactionIndex] = None) {
-  val ongoingCommittedTxns = mutable.Set.empty[Long]
-  val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
+private[log] class CleanedTransactionMetadata {
+  private val ongoingCommittedTxns = mutable.Set.empty[Long]
+  private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
+  // Minheap of aborted transactions sorted by the transaction first offset
+  private var abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
+    override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
+  }.reverse)
+
+  // Output cleaned index to write retained aborted transactions
+  var cleanedIndex: Option[TransactionIndex] = None
+
+  def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
+    this.abortedTransactions ++= abortedTransactions
+  }
 
   /**
    * Update the cleaned transaction state with a control batch that has just been traversed by the cleaner.
@@ -1081,9 +1087,11 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
       controlType match {
         case ControlRecordType.ABORT =>
           ongoingAbortedTxns.remove(producerId) match {
-            // Retain the marker until all batches from the transaction have been removed
+            // Retain the marker until all batches from the transaction have been removed.
+            // We may retain a record from an aborted transaction if it is the last entry
+            // written by a given producerId.
             case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
-              transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+              cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
               false
             case _ => true
           }
@@ -1103,7 +1111,7 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
   private def consumeAbortedTxnsUpTo(offset: Long): Unit = {
     while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) {
       val abortedTxn = abortedTransactions.dequeue()
-      ongoingAbortedTxns += abortedTxn.producerId -> new AbortedTransactionMetadata(abortedTxn)
+      ongoingAbortedTxns.getOrElseUpdate(abortedTxn.producerId, new AbortedTransactionMetadata(abortedTxn))
     }
   }
 
@@ -1131,4 +1139,6 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
 
 private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) {
   var lastObservedBatchOffset: Option[Long] = None
+
+  override def toString: String = s"(txn: $abortedTxn, lastOffset: $lastObservedBatchOffset)"
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 00c9a18..825fa90 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -84,7 +84,7 @@ class LogCleanerTest {
     val segments = log.logSegments.take(3).toSeq
     val stats = new CleanerStats()
     val expectedBytesRead = segments.map(_.size).sum
-    cleaner.cleanSegments(log, segments, map, 0L, stats)
+    cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata)
     val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
     assertEquals(shouldRemain, LogTest.keysInLog(log))
     assertEquals(expectedBytesRead, stats.bytesRead)
@@ -151,7 +151,7 @@ class LogCleanerTest {
     val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
     val stats = new CleanerStats()
     cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
-    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats)
+    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata)
 
     // Validate based on the file name that log segment file is renamed exactly once for async deletion
     assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -432,37 +432,37 @@ class LogCleanerTest {
     log.roll()
 
     // first time through the records are removed
-    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}]
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
     var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
-    assertEquals(List(4, 5, 6), offsetsInLog(log))
-    assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
+    assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
 
     // the empty batch remains if cleaned again because it still holds the last sequence
-    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}]
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
-    assertEquals(List(4, 5, 6), offsetsInLog(log))
-    assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
+    assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
 
     // append a new record from the producer to allow cleaning of the empty batch
-    // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] [{Producer2: 1}, {Producer2: Commit}]
-    //  {1},                     {3},                     {4},                 {5}, {6},  {8},            {9} ==> Offsets
+    // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
+    //  {1},                     {3},                     {4},                 {5}, {6}, {7},                 {8},            {9} ==> Offsets
     producer2(Seq(1)) // offset 8
     log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9
     log.roll()
 
-    // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}]
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
-    assertEquals(List(4, 5, 6, 8, 9), offsetsInLog(log))
-    assertEquals(List(1, 4, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
+    assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
 
-    // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}]
+    // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
-    assertEquals(List(5, 6, 8, 9), offsetsInLog(log))
-    assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
+    assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
   }
 
   @Test
@@ -497,6 +497,59 @@ class LogCleanerTest {
   }
 
   @Test
+  def testCommittedTransactionSpanningSegments(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+
+    val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
+    appendTransaction(Seq(1))
+    log.roll()
+
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.roll()
+
+    // Both the record and the marker should remain after cleaning
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(0, 1), offsetsInLog(log))
+    assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
+  def testAbortedTransactionSpanningSegments(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+
+    val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
+    appendTransaction(Seq(1))
+    log.roll()
+
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.roll()
+
+    // Both the batch and the marker should remain after cleaning. The batch is retained
+    // because it is the last entry for this producerId. The marker is retained because
+    // there are still batches remaining from this transaction.
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(1), offsetsInLog(log))
+    assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
+
+    // The empty batch and the marker is still retained after a second cleaning.
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(1), offsetsInLog(log))
+    assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
@@ -650,7 +703,7 @@ class LogCleanerTest {
 
     // clean the log
     val stats = new CleanerStats()
-    cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats)
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata)
     val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
     assertEquals(shouldRemain, LogTest.keysInLog(log))
   }
@@ -663,7 +716,7 @@ class LogCleanerTest {
     val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
-    cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
     val shouldRemain = LogTest.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
     assertEquals(shouldRemain, LogTest.keysInLog(log))
   }
@@ -682,7 +735,7 @@ class LogCleanerTest {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     intercept[CorruptRecordException] {
-      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
     }
   }
 
@@ -699,7 +752,7 @@ class LogCleanerTest {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     intercept[CorruptRecordException] {
-      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
     }
   }
 
@@ -1030,7 +1083,8 @@ class LogCleanerTest {
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     intercept[LogCleaningAbortedException] {
-      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats())
+      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(),
+        new CleanedTransactionMetadata)
     }
   }
 
@@ -1239,7 +1293,8 @@ class LogCleanerTest {
 
     // Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
     assertThrows[LogCleaningAbortedException] {
-      cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats())
+      cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats(),
+        new CleanedTransactionMetadata)
     }
     assertEquals(numSegmentsInitial + 1, log.logSegments.size)
     assertEquals(allKeys, LogTest.keysInLog(log))
@@ -1247,7 +1302,8 @@ class LogCleanerTest {
 
     // Clean each segment now that split is complete.
     for (segmentToClean <- log.logSegments)
-      cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats())
+      cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(),
+        new CleanedTransactionMetadata)
     assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log))
     assertFalse(LogTest.hasOffsetOverflow(log))
     log.close()
@@ -1287,7 +1343,8 @@ class LogCleanerTest {
       offsetMap.put(key(k), Long.MaxValue)
 
     // clean the log
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     var cleanedKeys = LogTest.keysInLog(log)
@@ -1302,7 +1359,8 @@ class LogCleanerTest {
     log = recoverAndCheck(config, allKeys)
 
     // clean again
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTest.keysInLog(log)
@@ -1323,7 +1381,8 @@ class LogCleanerTest {
     }
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTest.keysInLog(log)
@@ -1340,7 +1399,8 @@ class LogCleanerTest {
     }
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTest.keysInLog(log)
@@ -1568,7 +1628,8 @@ class LogCleanerTest {
     appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient)
   }
 
-  private def appendIdempotentAsLeader(log: Log, producerId: Long,
+  private def appendIdempotentAsLeader(log: Log,
+                                       producerId: Long,
                                        producerEpoch: Short,
                                        isTransactional: Boolean = false,
                                        leaderEpoch: Int = 0,