You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/29 23:50:10 UTC

[kafka] branch trunk updated: KAFKA-8542; Cache transaction first offset metadata on follower (#6943)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53b837f  KAFKA-8542; Cache transaction first offset metadata on follower (#6943)
53b837f is described below

commit 53b837f9c604cdb8a5a77a9363fc058b9ef9cd7e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sat Jun 29 16:49:42 2019 -0700

    KAFKA-8542; Cache transaction first offset metadata on follower (#6943)
    
    Followers should cache the log offset metadata for the start offset of each transaction in order to be able to compute the last stable offset without an offset index lookup. This is needed for follower fetching in KIP-392.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 72 ++++++++++++++--------
 core/src/main/scala/kafka/log/LogSegment.scala     |  2 +-
 .../scala/kafka/log/ProducerStateManager.scala     | 27 +++-----
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 52 +++++++++++++++-
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 52 ++++++----------
 5 files changed, 127 insertions(+), 78 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7ad43b4..b76185e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -782,7 +782,10 @@ class Log(@volatile var dir: File,
     val completedTxns = ListBuffer.empty[CompletedTxn]
     records.batches.asScala.foreach { batch =>
       if (batch.hasProducerId) {
-        val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false)
+        val maybeCompletedTxn = updateProducers(batch,
+          loadedProducers,
+          firstOffsetMetadata = None,
+          isFromClient = false)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
     }
@@ -1013,9 +1016,19 @@ class Log(@volatile var dir: File,
             s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
         }
 
+        // maybe roll the log if this segment is full
+        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
+
+        val logOffsetMetadata = LogOffsetMetadata(
+          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
+          segmentBaseOffset = segment.baseOffset,
+          relativePositionInSegment = segment.size)
+
         // now that we have valid records, offsets assigned, and timestamps updated, we need to
         // validate the idempotent/transactional state of the producers and collect some metadata
-        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
+        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
+          logOffsetMetadata, validRecords, isFromClient)
+
         maybeDuplicate.foreach { duplicate =>
           appendInfo.firstOffset = Some(duplicate.firstOffset)
           appendInfo.lastOffset = duplicate.lastOffset
@@ -1024,14 +1037,6 @@ class Log(@volatile var dir: File,
           return appendInfo
         }
 
-        // maybe roll the log if this segment is full
-        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
-
-        val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
-          segmentBaseOffset = segment.baseOffset,
-          relativePositionInSegment = segment.size)
-
         segment.append(largestOffset = appendInfo.lastOffset,
           largestTimestamp = appendInfo.maxTimestamp,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
@@ -1046,8 +1051,7 @@ class Log(@volatile var dir: File,
         updateLogEndOffset(appendInfo.lastOffset + 1)
 
         // update the producer state
-        for ((_, producerAppendInfo) <- updatedProducers) {
-          producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+        for (producerAppendInfo <- updatedProducers.values) {
           producerStateManager.update(producerAppendInfo)
         }
 
@@ -1139,23 +1143,42 @@ class Log(@volatile var dir: File,
     }
   }
 
-  private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
+  private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
+                                              records: MemoryRecords,
+                                              isFromClient: Boolean):
   (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
     val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
-    for (batch <- records.batches.asScala if batch.hasProducerId) {
-      val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
-
-      // if this is a client produce request, there will be up to 5 batches which could have been duplicated.
-      // If we find a duplicate, we return the metadata of the appended batch to the client.
-      if (isFromClient) {
-        maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
-          return (updatedProducers, completedTxns.toList, Some(duplicate))
+    var relativePositionInSegment = appendOffsetMetadata.relativePositionInSegment
+
+    for (batch <- records.batches.asScala) {
+      if (batch.hasProducerId) {
+        val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
+
+        // if this is a client produce request, there will be up to 5 batches which could have been duplicated.
+        // If we find a duplicate, we return the metadata of the appended batch to the client.
+        if (isFromClient) {
+          maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
+            return (updatedProducers, completedTxns.toList, Some(duplicate))
+          }
         }
+
+        // We cache offset metadata for the start of each transaction. This allows us to
+        // compute the last stable offset without relying on additional index lookups.
+        val firstOffsetMetadata = if (batch.isTransactional)
+          Some(LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
+        else
+          None
+
+        val maybeCompletedTxn = updateProducers(batch,
+          updatedProducers,
+          firstOffsetMetadata = firstOffsetMetadata,
+          isFromClient = isFromClient)
+
+        maybeCompletedTxn.foreach(completedTxns += _)
       }
 
-      val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
-      maybeCompletedTxn.foreach(completedTxns += _)
+      relativePositionInSegment += batch.sizeInBytes
     }
     (updatedProducers, completedTxns.toList, None)
   }
@@ -1249,10 +1272,11 @@ class Log(@volatile var dir: File,
 
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
+                              firstOffsetMetadata: Option[LogOffsetMetadata],
                               isFromClient: Boolean): Option[CompletedTxn] = {
     val producerId = batch.producerId
     val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient))
-    appendInfo.append(batch)
+    appendInfo.append(batch, firstOffsetMetadata)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 08182e8..37b2f9d 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -245,7 +245,7 @@ class LogSegment private[log] (val log: FileRecords,
     if (batch.hasProducerId) {
       val producerId = batch.producerId
       val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
-      val maybeCompletedTxn = appendInfo.append(batch)
+      val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>
         val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 5632b7b..3024352 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -263,7 +263,8 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
     nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
   }
 
-  def append(batch: RecordBatch): Option[CompletedTxn] = {
+  def append(batch: RecordBatch,
+             firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
       val recordIterator = batch.iterator
       if (recordIterator.hasNext) {
@@ -276,8 +277,9 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
         None
       }
     } else {
-      append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.baseOffset, batch.lastOffset,
-        batch.isTransactional)
+      val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(LogOffsetMetadata(batch.baseOffset))
+      append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
+        firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
       None
     }
   }
@@ -286,9 +288,10 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
              firstSeq: Int,
              lastSeq: Int,
              lastTimestamp: Long,
-             firstOffset: Long,
+             firstOffsetMetadata: LogOffsetMetadata,
              lastOffset: Long,
              isTransactional: Boolean): Unit = {
+    val firstOffset = firstOffsetMetadata.messageOffset
     maybeValidateAppend(epoch, firstSeq, firstOffset)
     updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
 
@@ -296,12 +299,12 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       case Some(_) if !isTransactional =>
         // Received a non-transactional message while a transaction is active
         throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " +
-          s"offset $firstOffset in partition $topicPartition")
+          s"offset $firstOffsetMetadata in partition $topicPartition")
 
       case None if isTransactional =>
         // Began a new transaction
         updatedEntry.currentTxnFirstOffset = Some(firstOffset)
-        transactions += new TxnMetadata(producerId, firstOffset)
+        transactions += TxnMetadata(producerId, firstOffsetMetadata)
 
       case _ => // nothing to do
     }
@@ -336,18 +339,6 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
 
   def startedTransactions: List[TxnMetadata] = transactions.toList
 
-  def maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata: LogOffsetMetadata): Unit = {
-    // we will cache the log offset metadata if it corresponds to the starting offset of
-    // the last transaction that was started. This is optimized for leader appends where it
-    // is only possible to have one transaction started for each log append, and the log
-    // offset metadata will always match in that case since no data from other producers
-    // is mixed into the append
-    transactions.headOption.foreach { txn =>
-      if (txn.firstOffset.messageOffset == logOffsetMetadata.messageOffset)
-        txn.firstOffset = logOffsetMetadata
-    }
-  }
-
   override def toString: String = {
     "ProducerAppendInfo(" +
       s"producerId=$producerId, " +
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9cd53e5..a796d6f 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -28,7 +28,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException
 import kafka.log.Log.DeleteDirSuffix
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
@@ -3308,7 +3308,21 @@ class LogTest {
     appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90
 
     val abortedTransactions = allAbortedTransactions(log)
-    assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
+    val expectedTransactions = List(
+      new AbortedTxn(pid1, 0L, 29L, 8L),
+      new AbortedTxn(pid2, 8L, 74L, 36L)
+    )
+    assertEquals(expectedTransactions, abortedTransactions)
+
+    // Verify caching of the segment position of the first unstable offset
+    log.highWatermark = 30L
+    assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
+
+    log.highWatermark = 75L
+    assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
+
+    log.highWatermark = log.logEndOffset
+    assertEquals(None, log.firstUnstableOffset)
   }
 
   @Test
@@ -3509,7 +3523,39 @@ class LogTest {
     appendAsFollower(log, MemoryRecords.readableRecords(buffer))
 
     val abortedTransactions = allAbortedTransactions(log)
-    assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
+    val expectedTransactions = List(
+      new AbortedTxn(pid1, 0L, 29L, 8L),
+      new AbortedTxn(pid2, 8L, 74L, 36L)
+    )
+
+    assertEquals(expectedTransactions, abortedTransactions)
+
+    // Verify caching of the segment position of the first unstable offset
+    log.highWatermark = 30L
+    assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
+
+    log.highWatermark = 75L
+    assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
+
+    log.highWatermark = log.logEndOffset
+    assertEquals(None, log.firstUnstableOffset)
+  }
+
+  private def assertCachedFirstUnstableOffset(log: Log, expectedOffset: Long): Unit = {
+    assertTrue(log.firstUnstableOffset.isDefined)
+    val firstUnstableOffset = log.firstUnstableOffset.get
+    assertEquals(expectedOffset, firstUnstableOffset.messageOffset)
+    assertFalse(firstUnstableOffset.messageOffsetOnly)
+    assertValidLogOffsetMetadata(log, firstUnstableOffset)
+  }
+
+  private def assertValidLogOffsetMetadata(log: Log, offsetMetadata: LogOffsetMetadata): Unit = {
+    val readInfo = log.read(startOffset = offsetMetadata.messageOffset,
+      maxLength = 2048,
+      maxOffset = None,
+      minOneMessage = false,
+      includeAbortedTxns = false)
+    assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata)
   }
 
   @Test(expected = classOf[TransactionCoordinatorFencedException])
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 3db3c22..41a68aa 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -137,7 +137,8 @@ class ProducerStateManagerTest {
 
     val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = false)
     // Sequence number wrap around
-    appendInfo.append(epoch, Int.MaxValue-10, 9, time.milliseconds(), 2000L, 2020L, isTransactional = false)
+    appendInfo.append(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
+      LogOffsetMetadata(2000L), 2020L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
@@ -209,14 +210,14 @@ class ProducerStateManagerTest {
     val offset = 992342L
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
 
-    val logOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
+    val firstOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
-    producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(),
+      firstOffsetMetadata, offset, isTransactional = true)
     stateManager.update(producerAppendInfo)
 
-    assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset)
+    assertEquals(Some(firstOffsetMetadata), stateManager.firstUnstableOffset)
   }
 
   @Test
@@ -232,10 +233,10 @@ class ProducerStateManagerTest {
         ProducerStateEntry.empty(producerId),
         ValidationType.Full
       )
-      producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true)
-      val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
+      val firstOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
         relativePositionInSegment = 50 * relativeOffset)
-      producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+      producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(),
+        firstOffsetMetadata, startOffset, isTransactional = true)
       stateManager.update(producerAppendInfo)
     }
 
@@ -274,35 +275,19 @@ class ProducerStateManagerTest {
   }
 
   @Test
-  def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = {
-    val producerEpoch = 0.toShort
-    val offset = 992342L
-    val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
-
-    // use some other offset to simulate a follower append where the log offset metadata won't typically
-    // match any of the transaction first offsets
-    val logOffsetMetadata = LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L,
-      relativePositionInSegment = 234224)
-    producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
-    stateManager.update(producerAppendInfo)
-
-    assertEquals(Some(LogOffsetMetadata(offset)), stateManager.firstUnstableOffset)
-  }
-
-  @Test
   def testPrepareUpdateDoesNotMutate(): Unit = {
     val producerEpoch = 0.toShort
 
     val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    appendInfo.append(producerEpoch, 0, 5, time.milliseconds(), 15L, 20L, isTransactional = false)
+    appendInfo.append(producerEpoch, 0, 5, time.milliseconds(),
+      LogOffsetMetadata(15L), 20L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
     val nextAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    nextAppendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 26L, 30L, isTransactional = false)
+    nextAppendInfo.append(producerEpoch, 6, 10, time.milliseconds(),
+      LogOffsetMetadata(26L), 30L, isTransactional = false)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
     var lastEntry = stateManager.lastEntry(producerId).get
@@ -325,7 +310,8 @@ class ProducerStateManagerTest {
     append(stateManager, producerId, producerEpoch, 0, offset)
 
     val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 16L, 20L, isTransactional = true)
+    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(),
+      LogOffsetMetadata(16L), 20L, isTransactional = true)
     var lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
@@ -335,7 +321,8 @@ class ProducerStateManagerTest {
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
-    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 26L, 30L, isTransactional = true)
+    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(),
+      LogOffsetMetadata(26L), 30L, isTransactional = true)
     lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
@@ -892,7 +879,8 @@ class ProducerStateManagerTest {
                      isTransactional: Boolean = false,
                      isFromClient : Boolean = true): Unit = {
     val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
-    producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, offset, isTransactional)
+    producerAppendInfo.append(producerEpoch, seq, seq, timestamp,
+      LogOffsetMetadata(offset), offset, isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
   }
@@ -904,7 +892,7 @@ class ProducerStateManagerTest {
                      batch: RecordBatch,
                      isFromClient : Boolean): Unit = {
     val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
-    producerAppendInfo.append(batch)
+    producerAppendInfo.append(batch, firstOffsetMetadataOpt = None)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
   }