You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/18 16:27:50 UTC

[kafka] branch 2.1 updated: MINOR: Ensure producer state append exceptions areuseful (#6591)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2ed976d  MINOR: Ensure producer state append exceptions areuseful (#6591)
2ed976d is described below

commit 2ed976def30d9d6aea6ef9f3a05ce4cf297ea438
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 18 08:38:43 2019 -0700

    MINOR: Ensure producer state append exceptions areuseful (#6591)
    
    We should include partition/offset information when we raise exceptions during producer state validation. This saves a lot of the discovery work to figure out where the problem occurred. This patch also includes a new test case to verify additional coordinator fencing cases.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../scala/kafka/log/ProducerStateManager.scala     | 48 +++++++++++---------
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 52 ++++++++++++++++++----
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  4 +-
 3 files changed, 73 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index a3b03d0..59bc417 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -184,7 +184,8 @@ private[log] class ProducerStateEntry(val producerId: Long,
  *                       should have ValidationType.None. Appends coming from a client for produce requests should have
  *                       ValidationType.Full.
  */
-private[log] class ProducerAppendInfo(val producerId: Long,
+private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
+                                      val producerId: Long,
                                       val currentEntry: ProducerStateEntry,
                                       val validationType: ValidationType) {
   private val transactions = ListBuffer.empty[TxnMetadata]
@@ -194,35 +195,36 @@ private[log] class ProducerAppendInfo(val producerId: Long,
   updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch
   updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset
 
-  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int) = {
+  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
     validationType match {
       case ValidationType.None =>
 
       case ValidationType.EpochOnly =>
-        checkProducerEpoch(producerEpoch)
+        checkProducerEpoch(producerEpoch, offset)
 
       case ValidationType.Full =>
-        checkProducerEpoch(producerEpoch)
-        checkSequence(producerEpoch, firstSeq)
+        checkProducerEpoch(producerEpoch, offset)
+        checkSequence(producerEpoch, firstSeq, offset)
     }
   }
 
-  private def checkProducerEpoch(producerEpoch: Short): Unit = {
+  private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
     if (producerEpoch < updatedEntry.producerEpoch) {
-      throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
-        s"with a newer epoch. $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (server epoch)")
+      throw new ProducerFencedException(s"Producer's epoch at offset $offset is no longer valid in " +
+        s"partition $topicPartition: $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (current epoch)")
     }
   }
 
-  private def checkSequence(producerEpoch: Short, appendFirstSeq: Int): Unit = {
+  private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = {
     if (producerEpoch != updatedEntry.producerEpoch) {
       if (appendFirstSeq != 0) {
         if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
-            s"(request epoch), $appendFirstSeq (seq. number)")
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch at offset $offset in " +
+            s"partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number)")
         } else {
-          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
-            s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")
+          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker at offset $offset" +
+            s"in partition $topicPartition. It is possible that the last message with the producerId=$producerId has " +
+            "been removed due to hitting the retention limit.")
         }
       }
     } else {
@@ -240,10 +242,12 @@ private[log] class ProducerAppendInfo(val producerId: Long,
         // the sequence number. Note that this check follows the fencing check, so the marker still fences
         // old producers even if it cannot determine our next expected sequence number.
         throw new UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch " +
-          s"for producerId=$producerId, but next expected sequence number is not known.")
+          s"for producerId=$producerId at offset $offset in partition $topicPartition, but the next expected " +
+          "sequence number is not known.")
       } else if (!inSequence(currentLastSeq, appendFirstSeq)) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq " +
-          s"(incoming seq. number), $currentLastSeq (current end sequence number)")
+        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId at " +
+          s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " +
+          s"$currentLastSeq (current end sequence number)")
       }
     }
   }
@@ -278,13 +282,14 @@ private[log] class ProducerAppendInfo(val producerId: Long,
              firstOffset: Long,
              lastOffset: Long,
              isTransactional: Boolean): Unit = {
-    maybeValidateAppend(epoch, firstSeq)
+    maybeValidateAppend(epoch, firstSeq, firstOffset)
     updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
 
     updatedEntry.currentTxnFirstOffset match {
       case Some(_) if !isTransactional =>
         // Received a non-transactional message while a transaction is active
-        throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId")
+        throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " +
+          s"offset $firstOffset in partition $topicPartition")
 
       case None if isTransactional =>
         // Began a new transaction
@@ -299,10 +304,11 @@ private[log] class ProducerAppendInfo(val producerId: Long,
                          producerEpoch: Short,
                          offset: Long,
                          timestamp: Long): CompletedTxn = {
-    checkProducerEpoch(producerEpoch)
+    checkProducerEpoch(producerEpoch, offset)
 
     if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
-      throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch} " +
+      throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " +
+        s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
         s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
 
     updatedEntry.maybeUpdateEpoch(producerEpoch)
@@ -628,7 +634,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
         ValidationType.Full
 
     val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId))
-    new ProducerAppendInfo(producerId, currentEntry, validationToPerform)
+    new ProducerAppendInfo(topicPartition, producerId, currentEntry, validationToPerform)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 699b0000..0a299e8 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3405,6 +3405,29 @@ class LogTest {
   }
 
   @Test
+  def testZombieCoordinatorFencedEmptyTransaction(): Unit = {
+    val pid = 1L
+    val epoch = 0.toShort
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+
+    val buffer = ByteBuffer.allocate(256)
+    val append = appendTransactionalToBuffer(buffer, pid, epoch, leaderEpoch = 1)
+    append(0, 10)
+    appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, ControlRecordType.COMMIT,
+      coordinatorEpoch = 0, leaderEpoch = 1)
+
+    buffer.flip()
+    log.appendAsFollower(MemoryRecords.readableRecords(buffer))
+
+    appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1)
+    appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1)
+    assertThrows[TransactionCoordinatorFencedException] {
+      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1, leaderEpoch = 1)
+    }
+  }
+
+  @Test
   def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -3564,10 +3587,14 @@ class LogTest {
     }
   }
 
-  private def appendEndTxnMarkerAsLeader(log: Log, producerId: Long, producerEpoch: Short,
-                                         controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
+  private def appendEndTxnMarkerAsLeader(log: Log,
+                                         producerId: Long,
+                                         producerEpoch: Short,
+                                         controlType: ControlRecordType,
+                                         coordinatorEpoch: Int = 0,
+                                         leaderEpoch: Int = 0): Unit = {
     val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch)
-    log.appendAsLeader(records, isFromClient = false, leaderEpoch = 0)
+    log.appendAsLeader(records, isFromClient = false, leaderEpoch = leaderEpoch)
   }
 
   private def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = {
@@ -3578,10 +3605,14 @@ class LogTest {
     log.appendAsLeader(records, leaderEpoch = 0)
   }
 
-  private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short): (Long, Int) => Unit = {
+  private def appendTransactionalToBuffer(buffer: ByteBuffer,
+                                          producerId: Long,
+                                          producerEpoch: Short,
+                                          leaderEpoch: Int = 0): (Long, Int) => Unit = {
     var sequence = 0
     (offset: Long, numRecords: Int) => {
-      val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, producerId, producerEpoch, sequence, true)
+      val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
+        offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch)
       for (seq <- sequence until sequence + numRecords) {
         val record = new SimpleRecord(s"$seq".getBytes)
         builder.append(record)
@@ -3592,10 +3623,15 @@ class LogTest {
     }
   }
 
-  private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, offset: Long,
-                                         controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
+  private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer,
+                                         producerId: Long,
+                                         producerEpoch: Short,
+                                         offset: Long,
+                                         controlType: ControlRecordType,
+                                         coordinatorEpoch: Int = 0,
+                                         leaderEpoch: Int = 0): Unit = {
     val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), 0, producerId, producerEpoch, marker)
+    MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker)
   }
 
   private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index a2abf7b..26067b4 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -208,7 +208,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
+    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
@@ -224,7 +224,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
+    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
 
     // use some other offset to simulate a follower append where the log offset metadata won't typically