You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/11/30 22:49:33 UTC

[kafka] branch trunk updated: KAFKA-10702; Skip bookkeeping of empty transactions (#9632)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7de280  KAFKA-10702; Skip bookkeeping of empty transactions (#9632)
e7de280 is described below

commit e7de280b0f1c7a924293dba79be77f56a08d0e15
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Nov 30 14:48:28 2020 -0800

    KAFKA-10702; Skip bookkeeping of empty transactions (#9632)
    
    Compacted topics can accumulate a large number of empty transaction markers as the data from the transactions gets cleaned. For each transaction, there is some bookkeeping that leaders and followers must do to keep the transaction index up to date. The cost of this overhead can degrade performance when a replica needs to catch up if the log has mostly empty or small transactions. This patch improves the cost by skipping over empty transactions since these will have no effect on the la [...]
    
    Reviewers: Lucas Bradstreet <lu...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 .../scala/kafka/log/ProducerStateManager.scala     | 36 ++++----
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 10 ++-
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 96 +++++++++++++++++-----
 3 files changed, 102 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d811d12..4a5a64a 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -44,7 +44,11 @@ class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
 
 
-private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) {
+private[log] case class TxnMetadata(
+  producerId: Long,
+  firstOffset: LogOffsetMetadata,
+  var lastOffset: Option[Long] = None
+) {
   def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
 
   override def toString: String = {
@@ -247,8 +251,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       if (recordIterator.hasNext) {
         val record = recordIterator.next()
         val endTxnMarker = EndTransactionMarker.deserialize(record)
-        val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
-        Some(completedTxn)
+        appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
       } else {
         // An empty control batch means the entire transaction has been cleaned from the log, so no need to append
         None
@@ -301,18 +304,20 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
     }
   }
 
-  def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
-                         producerEpoch: Short,
-                         offset: Long,
-                         timestamp: Long): CompletedTxn = {
+  def appendEndTxnMarker(
+    endTxnMarker: EndTransactionMarker,
+    producerEpoch: Short,
+    offset: Long,
+    timestamp: Long
+  ): Option[CompletedTxn] = {
     checkProducerEpoch(producerEpoch, offset)
     checkCoordinatorEpoch(endTxnMarker, offset)
 
-    val firstOffset = updatedEntry.currentTxnFirstOffset match {
-      case Some(txnFirstOffset) => txnFirstOffset
-      case None =>
-        transactions += new TxnMetadata(producerId, offset)
-        offset
+    // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker
+    // without any associated data will not have any impact on the last stable offset
+    // and would not need to be reflected in the transaction index.
+    val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
+      CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
     }
 
     updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
@@ -320,7 +325,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
     updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
     updatedEntry.lastTimestamp = timestamp
 
-    CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
+    completedTxn
   }
 
   def toEntry: ProducerStateEntry = updatedEntry
@@ -575,9 +580,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
   /**
    * The first undecided offset is the earliest transactional message which has not yet been committed
-   * or aborted.
+   * or aborted. Unlike [[firstUnstableOffset]], this does not reflect the state of replication (i.e.
+   * whether a completed transaction marker is beyond the high watermark).
    */
-  def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
+  private[log] def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
 
   /**
    * Returns the last offset of this map
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3759122..a3aea7a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -4411,9 +4411,11 @@ class LogTest {
     assertEquals(0L, log.lastStableOffset)
 
     // Try the append a second time. The appended offset in the log should still increase.
-    assertThrows[KafkaStorageException] {
-      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
-    }
+    // Note that the second append does not write to the transaction index because the producer
+    // state has already been updated and we do not write index entries for empty transactions.
+    // In the future, we may strengthen the fencing logic so that additional writes to the
+    // log are not possible after an IO error (see KAFKA-10778).
+    appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
     assertEquals(12L, log.logEndOffset)
     assertEquals(0L, log.lastStableOffset)
 
@@ -4425,7 +4427,7 @@ class LogTest {
 
     val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
     assertEquals(12L, reopenedLog.logEndOffset)
-    assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
+    assertEquals(1, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
     reopenedLog.updateHighWatermark(12L)
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index dfebf7e..207e129 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.StandardOpenOption
 import java.util.Collections
+import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
@@ -178,29 +179,24 @@ class ProducerStateManagerTest {
   }
 
   @Test
-  def testControlRecordBumpsEpoch(): Unit = {
-    val epoch = 0.toShort
-    append(stateManager, producerId, epoch, 0, 0L)
+  def testControlRecordBumpsProducerEpoch(): Unit = {
+    val producerEpoch = 0.toShort
+    append(stateManager, producerId, producerEpoch, 0, 0L)
 
-    val bumpedEpoch = 1.toShort
-    val (completedTxn, lastStableOffset) = appendEndTxnMarker(stateManager, producerId, bumpedEpoch, ControlRecordType.ABORT, 1L)
-    assertEquals(1L, completedTxn.firstOffset)
-    assertEquals(1L, completedTxn.lastOffset)
-    assertEquals(2L, lastStableOffset)
-    assertTrue(completedTxn.isAborted)
-    assertEquals(producerId, completedTxn.producerId)
+    val bumpedProducerEpoch = 1.toShort
+    appendEndTxnMarker(stateManager, producerId, bumpedProducerEpoch, ControlRecordType.ABORT, 1L)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
 
     val lastEntry = maybeLastEntry.get
-    assertEquals(bumpedEpoch, lastEntry.producerEpoch)
+    assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch)
     assertEquals(None, lastEntry.currentTxnFirstOffset)
     assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq)
     assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
 
     // should be able to append with the new epoch if we start at sequence 0
-    append(stateManager, producerId, bumpedEpoch, 0, 2L)
+    append(stateManager, producerId, bumpedProducerEpoch, 0, 2L)
     assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq))
   }
 
@@ -221,6 +217,64 @@ class ProducerStateManagerTest {
   }
 
   @Test
+  def testSkipEmptyTransactions(): Unit = {
+    val producerEpoch = 0.toShort
+    val coordinatorEpoch = 27
+    val seq = new AtomicInteger(0)
+
+    def appendEndTxn(
+      recordType: ControlRecordType,
+      offset: Long,
+      appendInfo: ProducerAppendInfo
+    ): Option[CompletedTxn] = {
+      appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType, coordinatorEpoch),
+        producerEpoch, offset, time.milliseconds())
+    }
+
+    def appendData(
+      startOffset: Long,
+      endOffset: Long,
+      appendInfo: ProducerAppendInfo
+    ): Unit = {
+      val count = (endOffset - startOffset).toInt
+      appendInfo.appendDataBatch(producerEpoch, seq.get(), seq.addAndGet(count), time.milliseconds(),
+        LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
+      seq.incrementAndGet()
+    }
+
+    // Start one transaction in a separate append
+    val firstAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    appendData(16L, 20L, firstAppend)
+    assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.head)
+    stateManager.update(firstAppend)
+    stateManager.onHighWatermarkUpdated(21L)
+    assertEquals(Some(LogOffsetMetadata(16L)), stateManager.firstUnstableOffset)
+
+    // Now do a single append which completes the old transaction, mixes in
+    // some empty transactions, one non-empty complete transaction, and one
+    // incomplete transaction
+    val secondAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, secondAppend)
+    assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)), firstCompletedTxn)
+    assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, secondAppend))
+    assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend))
+    appendData(24L, 27L, secondAppend)
+    val secondCompletedTxn = appendEndTxn(ControlRecordType.ABORT, 28L, secondAppend)
+    assertTrue(secondCompletedTxn.isDefined)
+    assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 29L, secondAppend))
+    appendData(30L, 31L, secondAppend)
+
+    assertEquals(2, secondAppend.startedTransactions.size)
+    assertEquals(TxnMetadata(producerId, LogOffsetMetadata(24L)), secondAppend.startedTransactions.head)
+    assertEquals(TxnMetadata(producerId, LogOffsetMetadata(30L)), secondAppend.startedTransactions.last)
+    stateManager.update(secondAppend)
+    stateManager.completeTxn(firstCompletedTxn.get)
+    stateManager.completeTxn(secondCompletedTxn.get)
+    stateManager.onHighWatermarkUpdated(32L)
+    assertEquals(Some(LogOffsetMetadata(30L)), stateManager.firstUnstableOffset)
+  }
+
+  @Test
   def testLastStableOffsetCompletedTxn(): Unit = {
     val producerEpoch = 0.toShort
     val segmentBaseOffset = 990000L
@@ -333,7 +387,10 @@ class ProducerStateManagerTest {
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
     val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
-    val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds())
+    val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds())
+    assertTrue(completedTxnOpt.isDefined)
+
+    val completedTxn = completedTxnOpt.get
     assertEquals(producerId, completedTxn.producerId)
     assertEquals(16L, completedTxn.firstOffset)
     assertEquals(40L, completedTxn.lastOffset)
@@ -821,7 +878,6 @@ class ProducerStateManagerTest {
   @Test
   def testAppendEmptyControlBatch(): Unit = {
     val producerId = 23423L
-    val producerEpoch = 145.toShort
     val baseOffset = 15
 
     val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
@@ -830,7 +886,7 @@ class ProducerStateManagerTest {
     EasyMock.replay(batch)
 
     // Appending the empty control batch should not throw and a new transaction shouldn't be started
-    append(stateManager, producerId, producerEpoch, baseOffset, batch, origin = AppendOrigin.Client)
+    append(stateManager, producerId, baseOffset, batch, origin = AppendOrigin.Client)
     assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
@@ -904,15 +960,14 @@ class ProducerStateManagerTest {
                                  controlType: ControlRecordType,
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
-                                 timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = {
+                                 timestamp: Long = time.milliseconds()): Option[CompletedTxn] = {
     val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Coordinator)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
+    val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
-    val lastStableOffset = mapping.lastStableOffset(completedTxn)
-    mapping.completeTxn(completedTxn)
+    completedTxnOpt.foreach(mapping.completeTxn)
     mapping.updateMapEndOffset(offset + 1)
-    (completedTxn, lastStableOffset)
+    completedTxnOpt
   }
 
   private def append(stateManager: ProducerStateManager,
@@ -932,7 +987,6 @@ class ProducerStateManagerTest {
 
   private def append(stateManager: ProducerStateManager,
                      producerId: Long,
-                     producerEpoch: Short,
                      offset: Long,
                      batch: RecordBatch,
                      origin: AppendOrigin): Unit = {