You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/18 18:02:25 UTC

[kafka] branch 2.1 updated: KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new f15d149  KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570)
f15d149 is described below

commit f15d149f8ab197b332e496122081b05e266682e3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 18 08:41:32 2019 -0700

    KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570)
    
    This patch fixes a bug in the append logic which can cause duplicate offsets to be appended to the log when the append to the transaction index fails. Rather than incrementing the log end offset after the index append, we do it immediately after the records are written to the log. If the index append later fails, we do two things:
    
    1) We ensure that the last stable offset cannot advance. This guarantees that the aborted data will not be returned to the user until the transaction index contains the corresponding entry.
    2) We skip updating the end offset of the producer state. When recovering the log, we will have to reprocess the log and write the index entries.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 14 ++++--
 core/src/main/scala/kafka/log/LogSegment.scala     |  3 +-
 .../scala/kafka/log/ProducerStateManager.scala     | 21 +++++---
 .../main/scala/kafka/log/TransactionIndex.scala    |  6 +--
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 41 ++++++++++++++++
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 56 +++++++++++++++++++++-
 6 files changed, 126 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b56b26f..af0f343 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -941,6 +941,14 @@ class Log(@volatile var dir: File,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
           records = validRecords)
 
+        // Increment the log end offset. We do this immediately after the append because a
+        // write to the transaction index below may fail and we want to ensure that the offsets
+        // of future appends still grow monotonically. The resulting transaction index inconsistency
+        // will be cleaned up after the log directory is recovered. Note that the end offset of the
+        // ProducerStateManager will not be updated and the last stable offset will not advance
+        // if the append to the transaction index fails.
+        updateLogEndOffset(appendInfo.lastOffset + 1)
+
         // update the producer state
         for ((_, producerAppendInfo) <- updatedProducers) {
           producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
@@ -950,17 +958,15 @@ class Log(@volatile var dir: File,
         // update the transaction index with the true last stable offset. The last offset visible
         // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
         for (completedTxn <- completedTxns) {
-          val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
           segment.updateTxnIndex(completedTxn, lastStableOffset)
+          producerStateManager.completeTxn(completedTxn)
         }
 
         // always update the last producer id map offset so that the snapshot reflects the current offset
         // even if there isn't any idempotent data being written
         producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
 
-        // increment the log end offset
-        updateLogEndOffset(appendInfo.lastOffset + 1)
-
         // update the first unstable offset (which is used to compute LSO)
         updateFirstUnstableOffset()
 
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 5ce9e19..12af8ec 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -229,8 +229,9 @@ class LogSegment private[log] (val log: FileRecords,
       val maybeCompletedTxn = appendInfo.append(batch)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>
-        val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+        val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
         updateTxnIndex(completedTxn, lastStableOffset)
+        producerStateManager.completeTxn(completedTxn)
       }
     }
     producerStateManager.updateMapEndOffset(batch.lastOffset + 1)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 59bc417..3db436e 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.{immutable, mutable}
 
@@ -605,7 +606,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * snapshot in range (if there is one). Note that the log end offset is assumed to be less than
    * or equal to the high watermark.
    */
-  def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
+  def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = {
     // remove all out of range snapshots
     deleteSnapshotFiles(logDir, { snapOffset =>
       snapOffset > logEndOffset || snapOffset <= logStartOffset
@@ -757,9 +758,20 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   /**
-   * Complete the transaction and return the last stable offset.
+   * Compute the last stable offset of a completed transaction, but do not yet mark the transaction complete.
+   * That will be done in `completeTxn` below. This is used to compute the LSO that will be appended to the
+   * transaction index, but the completion must be done only after successfully appending to the index.
    */
-  def completeTxn(completedTxn: CompletedTxn): Long = {
+  def lastStableOffset(completedTxn: CompletedTxn): Long = {
+    val nextIncompleteTxn = ongoingTxns.values.asScala.find(_.producerId != completedTxn.producerId)
+    nextIncompleteTxn.map(_.firstOffset.messageOffset).getOrElse(completedTxn.lastOffset  + 1)
+  }
+
+  /**
+   * Mark a transaction as completed. We will still await advancement of the high watermark before
+   * advancing the first unstable offset.
+   */
+  def completeTxn(completedTxn: CompletedTxn): Unit = {
     val txnMetadata = ongoingTxns.remove(completedTxn.firstOffset)
     if (txnMetadata == null)
       throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " +
@@ -767,9 +779,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
     txnMetadata.lastOffset = Some(completedTxn.lastOffset)
     unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata)
-
-    val lastStableOffset = firstUndecidedOffset.getOrElse(completedTxn.lastOffset + 1)
-    lastStableOffset
   }
 
   @threadsafe
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index e730fdb..696bc3a 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -57,7 +57,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
           s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
     }
     lastOffset = Some(abortedTxn.lastOffset)
-    Utils.writeFully(channel, abortedTxn.buffer.duplicate())
+    Utils.writeFully(channel(), abortedTxn.buffer.duplicate())
   }
 
   def flush(): Unit = maybeChannel.foreach(_.force(true))
@@ -74,7 +74,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     Files.deleteIfExists(file.toPath)
   }
 
-  private def channel: FileChannel = {
+  private def channel(): FileChannel = {
     maybeChannel match {
       case Some(channel) => channel
       case None => openChannel()
@@ -114,7 +114,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     var newLastOffset: Option[Long] = None
     for ((abortedTxn, position) <- iterator(() => buffer)) {
       if (abortedTxn.lastOffset >= offset) {
-        channel.truncate(position)
+        channel().truncate(position)
         lastOffset = newLastOffset
         return
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 0a299e8..2af97fa 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3479,6 +3479,47 @@ class LogTest {
   }
 
   @Test
+  def testAppendToTransactionIndexFailure(): Unit = {
+    val pid = 1L
+    val epoch = 0.toShort
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+
+    val append = appendTransactionalAsLeader(log, pid, epoch)
+    append(10)
+
+    // Kind of a hack, but renaming the index to a directory ensures that the append
+    // to the index will fail.
+    log.activeSegment.txnIndex.renameTo(log.dir)
+
+    // The append will be written to the log successfully, but the write to the index will fail
+    assertThrows[KafkaStorageException] {
+      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
+    }
+    assertEquals(11L, log.logEndOffset)
+    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+
+    // Try the append a second time. The appended offset in the log should still increase.
+    assertThrows[KafkaStorageException] {
+      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
+    }
+    assertEquals(12L, log.logEndOffset)
+    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+
+    // Even if the high watermark is updated, the first unstable offset does not move
+    log.onHighWatermarkIncremented(12L)
+    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+
+    log.close()
+
+    val reopenedLog = createLog(logDir, logConfig)
+    assertEquals(12L, reopenedLog.logEndOffset)
+    assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
+    reopenedLog.onHighWatermarkIncremented(12L)
+    assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset))
+  }
+
+  @Test
   def testLastStableOffsetWithMixedProducerData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 26067b4..1d29566 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -220,6 +220,59 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testLastStableOffsetCompletedTxn(): Unit = {
+    val producerEpoch = 0.toShort
+    val segmentBaseOffset = 990000L
+
+    def beginTxn(producerId: Long, startOffset: Long): Unit = {
+      val relativeOffset = (startOffset - segmentBaseOffset).toInt
+      val producerAppendInfo = new ProducerAppendInfo(
+        partition,
+        producerId,
+        ProducerStateEntry.empty(producerId),
+        ValidationType.Full)
+      producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true)
+      val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
+        relativePositionInSegment = 50 * relativeOffset)
+      producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+      stateManager.update(producerAppendInfo)
+    }
+
+    val producerId1 = producerId
+    val startOffset1 = 992342L
+    beginTxn(producerId1, startOffset1)
+
+    val producerId2 = producerId + 1
+    val startOffset2 = startOffset1 + 25
+    beginTxn(producerId2, startOffset2)
+
+    val producerId3 = producerId + 2
+    val startOffset3 = startOffset1 + 57
+    beginTxn(producerId3, startOffset3)
+
+    val lastOffset1 = startOffset3 + 15
+    val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false)
+    assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1))
+    stateManager.completeTxn(completedTxn1)
+    stateManager.onHighWatermarkUpdated(lastOffset1 + 1)
+    assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
+
+    val lastOffset3 = lastOffset1 + 20
+    val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false)
+    assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3))
+    stateManager.completeTxn(completedTxn3)
+    stateManager.onHighWatermarkUpdated(lastOffset3 + 1)
+    assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
+
+    val lastOffset2 = lastOffset3 + 78
+    val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false)
+    assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2))
+    stateManager.completeTxn(completedTxn2)
+    stateManager.onHighWatermarkUpdated(lastOffset2 + 1)
+    assertEquals(None, stateManager.firstUnstableOffset)
+  }
+
+  @Test
   def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = {
     val producerEpoch = 0.toShort
     val offset = 992342L
@@ -823,7 +876,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
-    val lastStableOffset = mapping.completeTxn(completedTxn)
+    val lastStableOffset = mapping.lastStableOffset(completedTxn)
+    mapping.completeTxn(completedTxn)
     mapping.updateMapEndOffset(offset + 1)
     (completedTxn, lastStableOffset)
   }