You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/18 15:41:44 UTC

[kafka] branch trunk updated: KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6964c35  KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570)
6964c35 is described below

commit 6964c356aa0acdb96d5ccb689a088ba224cf41f5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 18 08:41:32 2019 -0700

    KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570)
    
    This patch fixes a bug in the append logic which can cause duplicate offsets to be appended to the log when the append to the transaction index fails. Rather than incrementing the log end offset after the index append, we do it immediately after the records are written to the log. If the index append later fails, we do two things:
    
    1) We ensure that the last stable offset cannot advance. This guarantees that the aborted data will not be returned to the user until the transaction index contains the corresponding entry.
    2) We skip updating the end offset of the producer state. When recovering the log, we will have to reprocess the log and write the index entries.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 14 ++++--
 core/src/main/scala/kafka/log/LogSegment.scala     |  3 +-
 .../scala/kafka/log/ProducerStateManager.scala     | 21 ++++++---
 .../main/scala/kafka/log/TransactionIndex.scala    |  6 +--
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 41 +++++++++++++++++
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 52 +++++++++++++++++++++-
 6 files changed, 122 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5ad3c3e..87179db 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -943,6 +943,14 @@ class Log(@volatile var dir: File,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
           records = validRecords)
 
+        // Increment the log end offset. We do this immediately after the append because a
+        // write to the transaction index below may fail and we want to ensure that the offsets
+        // of future appends still grow monotonically. The resulting transaction index inconsistency
+        // will be cleaned up after the log directory is recovered. Note that the end offset of the
+        // ProducerStateManager will not be updated and the last stable offset will not advance
+        // if the append to the transaction index fails.
+        updateLogEndOffset(appendInfo.lastOffset + 1)
+
         // update the producer state
         for ((_, producerAppendInfo) <- updatedProducers) {
           producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
@@ -952,17 +960,15 @@ class Log(@volatile var dir: File,
         // update the transaction index with the true last stable offset. The last offset visible
         // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
         for (completedTxn <- completedTxns) {
-          val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
           segment.updateTxnIndex(completedTxn, lastStableOffset)
+          producerStateManager.completeTxn(completedTxn)
         }
 
         // always update the last producer id map offset so that the snapshot reflects the current offset
         // even if there isn't any idempotent data being written
         producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
 
-        // increment the log end offset
-        updateLogEndOffset(appendInfo.lastOffset + 1)
-
         // update the first unstable offset (which is used to compute LSO)
         updateFirstUnstableOffset()
 
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9168ce0..624f3ea 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -247,8 +247,9 @@ class LogSegment private[log] (val log: FileRecords,
       val maybeCompletedTxn = appendInfo.append(batch)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>
-        val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+        val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
         updateTxnIndex(completedTxn, lastStableOffset)
+        producerStateManager.completeTxn(completedTxn)
       }
     }
     producerStateManager.updateMapEndOffset(batch.lastOffset + 1)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 59bc417..3db436e 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.{immutable, mutable}
 
@@ -605,7 +606,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * snapshot in range (if there is one). Note that the log end offset is assumed to be less than
    * or equal to the high watermark.
    */
-  def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
+  def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = {
     // remove all out of range snapshots
     deleteSnapshotFiles(logDir, { snapOffset =>
       snapOffset > logEndOffset || snapOffset <= logStartOffset
@@ -757,9 +758,20 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   /**
-   * Complete the transaction and return the last stable offset.
+   * Compute the last stable offset of a completed transaction, but do not yet mark the transaction complete.
+   * That will be done in `completeTxn` below. This is used to compute the LSO that will be appended to the
+   * transaction index, but the completion must be done only after successfully appending to the index.
    */
-  def completeTxn(completedTxn: CompletedTxn): Long = {
+  def lastStableOffset(completedTxn: CompletedTxn): Long = {
+    val nextIncompleteTxn = ongoingTxns.values.asScala.find(_.producerId != completedTxn.producerId)
+    nextIncompleteTxn.map(_.firstOffset.messageOffset).getOrElse(completedTxn.lastOffset  + 1)
+  }
+
+  /**
+   * Mark a transaction as completed. We will still await advancement of the high watermark before
+   * advancing the first unstable offset.
+   */
+  def completeTxn(completedTxn: CompletedTxn): Unit = {
     val txnMetadata = ongoingTxns.remove(completedTxn.firstOffset)
     if (txnMetadata == null)
       throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " +
@@ -767,9 +779,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
     txnMetadata.lastOffset = Some(completedTxn.lastOffset)
     unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata)
-
-    val lastStableOffset = firstUndecidedOffset.getOrElse(completedTxn.lastOffset + 1)
-    lastStableOffset
   }
 
   @threadsafe
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index e730fdb..696bc3a 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -57,7 +57,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
           s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
     }
     lastOffset = Some(abortedTxn.lastOffset)
-    Utils.writeFully(channel, abortedTxn.buffer.duplicate())
+    Utils.writeFully(channel(), abortedTxn.buffer.duplicate())
   }
 
   def flush(): Unit = maybeChannel.foreach(_.force(true))
@@ -74,7 +74,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     Files.deleteIfExists(file.toPath)
   }
 
-  private def channel: FileChannel = {
+  private def channel(): FileChannel = {
     maybeChannel match {
       case Some(channel) => channel
       case None => openChannel()
@@ -114,7 +114,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     var newLastOffset: Option[Long] = None
     for ((abortedTxn, position) <- iterator(() => buffer)) {
       if (abortedTxn.lastOffset >= offset) {
-        channel.truncate(position)
+        channel().truncate(position)
         lastOffset = newLastOffset
         return
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index cb2b4b2..2b3e362 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3532,6 +3532,47 @@ class LogTest {
   }
 
   @Test
+  def testAppendToTransactionIndexFailure(): Unit = {
+    val pid = 1L
+    val epoch = 0.toShort
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+
+    val append = appendTransactionalAsLeader(log, pid, epoch)
+    append(10)
+
+    // Kind of a hack, but renaming the index to a directory ensures that the append
+    // to the index will fail.
+    log.activeSegment.txnIndex.renameTo(log.dir)
+
+    // The append will be written to the log successfully, but the write to the index will fail
+    assertThrows[KafkaStorageException] {
+      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
+    }
+    assertEquals(11L, log.logEndOffset)
+    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+
+    // Try the append a second time. The appended offset in the log should still increase.
+    assertThrows[KafkaStorageException] {
+      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
+    }
+    assertEquals(12L, log.logEndOffset)
+    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+
+    // Even if the high watermark is updated, the first unstable offset does not move
+    log.onHighWatermarkIncremented(12L)
+    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+
+    log.close()
+
+    val reopenedLog = createLog(logDir, logConfig)
+    assertEquals(12L, reopenedLog.logEndOffset)
+    assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
+    reopenedLog.onHighWatermarkIncremented(12L)
+    assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset))
+  }
+
+  @Test
   def testLastStableOffsetWithMixedProducerData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 26067b4..2062fae 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -220,6 +220,55 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testLastStableOffsetCompletedTxn(): Unit = {
+    val producerEpoch = 0.toShort
+    val segmentBaseOffset = 990000L
+
+    def beginTxn(producerId: Long, startOffset: Long): Unit = {
+      val relativeOffset = (startOffset - segmentBaseOffset).toInt
+      val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
+      producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true)
+      val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
+        relativePositionInSegment = 50 * relativeOffset)
+      producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+      stateManager.update(producerAppendInfo)
+    }
+
+    val producerId1 = producerId
+    val startOffset1 = 992342L
+    beginTxn(producerId1, startOffset1)
+
+    val producerId2 = producerId + 1
+    val startOffset2 = startOffset1 + 25
+    beginTxn(producerId2, startOffset2)
+
+    val producerId3 = producerId + 2
+    val startOffset3 = startOffset1 + 57
+    beginTxn(producerId3, startOffset3)
+
+    val lastOffset1 = startOffset3 + 15
+    val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false)
+    assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1))
+    stateManager.completeTxn(completedTxn1)
+    stateManager.onHighWatermarkUpdated(lastOffset1 + 1)
+    assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
+
+    val lastOffset3 = lastOffset1 + 20
+    val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false)
+    assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3))
+    stateManager.completeTxn(completedTxn3)
+    stateManager.onHighWatermarkUpdated(lastOffset3 + 1)
+    assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
+
+    val lastOffset2 = lastOffset3 + 78
+    val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false)
+    assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2))
+    stateManager.completeTxn(completedTxn2)
+    stateManager.onHighWatermarkUpdated(lastOffset2 + 1)
+    assertEquals(None, stateManager.firstUnstableOffset)
+  }
+
+  @Test
   def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = {
     val producerEpoch = 0.toShort
     val offset = 992342L
@@ -823,7 +872,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
-    val lastStableOffset = mapping.completeTxn(completedTxn)
+    val lastStableOffset = mapping.lastStableOffset(completedTxn)
+    mapping.completeTxn(completedTxn)
     mapping.updateMapEndOffset(offset + 1)
     (completedTxn, lastStableOffset)
   }