You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/13 15:59:57 UTC

[kafka] branch 2.0 updated: KAFKA-8335; Clean empty batches when sequence numbers are reused (#6715)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new e4aa2cc  KAFKA-8335; Clean empty batches when sequence numbers are reused (#6715)
e4aa2cc is described below

commit e4aa2ccda2d7dc19698c385d78f5e1d10135081a
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon May 13 08:47:53 2019 -0700

    KAFKA-8335; Clean empty batches when sequence numbers are reused (#6715)
    
    The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this:
    
    1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined.
    2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number.
    
    The complete fix for the second issue would probably add proper sequence number bookkeeping in the coordinator. For now, we have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            |  8 ++++
 core/src/main/scala/kafka/log/LogCleaner.scala     | 27 ++++++++---
 .../scala/kafka/log/ProducerStateManager.scala     |  6 +++
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 53 ++++++++++++++++++++--
 4 files changed, 84 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 49f18a2..2df961b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -680,6 +680,14 @@ class Log(@volatile var dir: File,
     }
   }
 
+  private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized {
+    producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
+      val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None
+      val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch)
+      producerId -> lastRecord
+    }
+  }
+
   /**
    * Check if we have the "clean shutdown" file
    */
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 03e4788..36d64f4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -493,6 +493,8 @@ private[log] class Cleaner(val id: Int,
       // clean segments into the new destination segment
       val iter = segments.iterator
       var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
+      val lastOffsetOfActiveProducers = log.lastRecordsOfActiveProducers
+
       while (currentSegmentOpt.isDefined) {
         val currentSegment = currentSegmentOpt.get
         val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
@@ -508,7 +510,7 @@ private[log] class Cleaner(val id: Int,
 
         try {
           cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes, log.config.maxMessageSize,
-            transactionMetadata, log.activeProducersWithLastSequence, stats)
+            transactionMetadata, lastOffsetOfActiveProducers, stats)
         } catch {
           case e: LogSegmentOffsetOverflowException =>
             // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
@@ -560,9 +562,9 @@ private[log] class Cleaner(val id: Int,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
-                             activeProducers: Map[Long, Int],
+                             lastRecordsOfActiveProducers: Map[Long, LastRecord],
                              stats: CleanerStats) {
-    val logCleanerFilter = new RecordFilter {
+    val logCleanerFilter: RecordFilter = new RecordFilter {
       var discardBatchRecords: Boolean = _
 
       override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
@@ -570,9 +572,22 @@ private[log] class Cleaner(val id: Int,
         // note that we will never delete a marker until all the records from that transaction are removed.
         discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletes)
 
-        // check if the batch contains the last sequence number for the producer. if so, we cannot
-        // remove the batch just yet or the producer may see an out of sequence error.
-        if (batch.hasProducerId && activeProducers.get(batch.producerId).contains(batch.lastSequence))
+        def isBatchLastRecordOfProducer: Boolean = {
+          // We retain the batch in order to preserve the state of active producers. There are three cases:
+          // 1) The producer is no longer active, which means we can delete all records for that producer.
+          // 2) The producer is still active and has a last data offset. We retain the batch that contains
+          //    this offset since it also contains the last sequence number for this producer.
+          // 3) The last entry in the log is a transaction marker. We retain this marker since it has the
+          //    last producer epoch, which is needed to ensure fencing.
+          lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord =>
+            lastRecord.lastDataOffset match {
+              case Some(offset) => batch.lastOffset == offset
+              case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
+            }
+          }
+        }
+
+        if (batch.hasProducerId && isBatchLastRecordOfProducer)
           BatchRetention.RETAIN_EMPTY
         else if (discardBatchRecords)
           BatchRetention.DELETE
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 2dac73d..cd51032 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -36,6 +36,12 @@ import scala.collection.{immutable, mutable}
 
 class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 
+/**
+ * The last written record for a given producer. The last data offset may be undefined
+ * if the only log entry for a producer is a transaction marker.
+ */
+case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
+
 
 // ValidationType and its subtypes define the extent of the validation to perform on a given ProducerAppendInfo instance
 private[log] sealed trait ValidationType
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 51477b6..0dcfeec 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -527,6 +527,45 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testEmptyBatchRemovalWithSequenceReuse(): Unit = {
+    // The group coordinator always writes batches beginning with sequence number 0. This test
+    // ensures that we still remove old empty batches and transaction markers under this expectation.
+
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, isFromClient = false)
+    appendFirstTransaction(Seq(1))
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+
+    val appendSecondTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, isFromClient = false)
+    appendSecondTransaction(Seq(2))
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+
+    log.appendAsLeader(record(1, 1), leaderEpoch = 0)
+    log.appendAsLeader(record(2, 1), leaderEpoch = 0)
+
+    // Roll the log to ensure that the data is cleanable.
+    log.roll()
+
+    // Both transactional batches will be cleaned. The last one will remain in the log
+    // as an empty batch in order to preserve the producer sequence number and epoch
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
+    assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
+
+    // On the second round of cleaning, the marker from the first transaction should be removed.
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(3, 4, 5), offsetsInLog(log))
+    assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRetentionWithEmptyBatch(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
@@ -1521,13 +1560,19 @@ class LogCleanerTest extends JUnitSuite {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true)
+  private def appendTransactionalAsLeader(log: Log,
+                                          producerId: Long,
+                                          producerEpoch: Short,
+                                          leaderEpoch: Int = 0,
+                                          isFromClient: Boolean = true): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient)
   }
 
   private def appendIdempotentAsLeader(log: Log, producerId: Long,
                                        producerEpoch: Short,
-                                       isTransactional: Boolean = false): Seq[Int] => LogAppendInfo = {
+                                       isTransactional: Boolean = false,
+                                       leaderEpoch: Int = 0,
+                                       isFromClient: Boolean = true): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>
@@ -1539,7 +1584,7 @@ class LogCleanerTest extends JUnitSuite {
       else
         MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
       sequence += simpleRecords.size
-      log.appendAsLeader(records, leaderEpoch = 0)
+      log.appendAsLeader(records, leaderEpoch, isFromClient)
     }
   }