You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/07/15 21:24:43 UTC

[kafka] branch 2.6 updated: KAFKA-9666; Don't increase producer epoch when trying to fence if the log append fails (#8239)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 1f3f227  KAFKA-9666; Don't increase producer epoch when trying to fence if the log append fails (#8239)
1f3f227 is described below

commit 1f3f227b59aee1dd20621759368502c47582a832
Author: Bob Barrett <bo...@confluent.io>
AuthorDate: Wed Jul 15 14:16:06 2020 -0700

    KAFKA-9666; Don't increase producer epoch when trying to fence if the log append fails (#8239)
    
    When fencing producers, we currently blindly bump the epoch by 1 and write an abort marker to the transaction log. If the log is unavailable (for example, because the number of in-sync replicas is less than min.in.sync.replicas), we will roll back the attempted write of the abort marker, but still increment the epoch in the transaction metadata cache. During periods of prolonged log unavailability, producer retires of InitProducerId calls can cause the epoch to be increased to the poi [...]
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../transaction/TransactionCoordinator.scala       |  20 +++-
 .../transaction/TransactionMetadata.scala          |  14 ++-
 .../integration/kafka/api/TransactionsTest.scala   |  59 ++++++++++
 .../transaction/TransactionCoordinatorTest.scala   | 128 ++++++++++++++++++++-
 4 files changed, 214 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index dece3aa..ec61d61 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -365,6 +365,7 @@ class TransactionCoordinator(brokerId: Int,
                              txnMarkerResult: TransactionResult,
                              isFromClient: Boolean,
                              responseCallback: EndTxnCallback): Unit = {
+    var isEpochFence = false
     if (transactionalId == null || transactionalId.isEmpty)
       responseCallback(Errors.INVALID_REQUEST)
     else {
@@ -394,9 +395,10 @@ class TransactionCoordinator(brokerId: Int,
                 if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {
                   // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to append.
+                  isEpochFence = true
                   txnMetadata.pendingState = None
-                  txnMetadata.lastProducerEpoch = txnMetadata.producerEpoch
                   txnMetadata.producerEpoch = producerEpoch
+                  txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
                 }
 
                 Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
@@ -501,6 +503,22 @@ class TransactionCoordinator(brokerId: Int,
               info(s"Aborting sending of transaction markers and returning $error error to client for $transactionalId's EndTransaction request of $txnMarkerResult, " +
                 s"since appending $newMetadata to transaction log with coordinator epoch $coordinatorEpoch failed")
 
+              if (isEpochFence) {
+                txnManager.getTransactionState(transactionalId).foreach {
+                  case None =>
+                    warn(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
+                      s"no metadata in the cache; this is not expected")
+
+                  case Some(epochAndMetadata) =>
+                    if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
+                      // This was attempted epoch fence that failed, so mark this state on the metadata
+                      epochAndMetadata.transactionMetadata.hasFailedEpochFence = true
+                      warn(s"The coordinator failed to write an epoch fence transition for producer $transactionalId to the transaction log " +
+                        s"with error $error. The epoch was increased to ${newMetadata.producerEpoch} but not returned to the client")
+                    }
+                }
+              }
+
               responseCallback(error)
             }
           }
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 24b418a..d3d1289 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -183,6 +183,10 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
   // initialized as the same as the current state
   var pendingState: Option[TransactionState] = None
 
+  // Indicates that during a previous attempt to fence a producer, the bumped epoch may not have been
+  // successfully written to the log. If this is true, we will not bump the epoch again when fencing
+  var hasFailedEpochFence: Boolean = false
+
   private[transaction] val lock = new ReentrantLock
 
   def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
@@ -210,7 +214,11 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
     if (producerEpoch == Short.MaxValue)
       throw new IllegalStateException(s"Cannot fence producer with epoch equal to Short.MaxValue since this would overflow")
 
-    prepareTransitionTo(PrepareEpochFence, producerId, (producerEpoch + 1).toShort, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
+    // If we've already failed to fence an epoch (because the write to the log failed), we don't increase it again.
+    // This is safe because we never return the epoch to client if we fail to fence the epoch
+    val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else (producerEpoch + 1).toShort
+
+    prepareTransitionTo(PrepareEpochFence, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
       topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp)
   }
 
@@ -284,6 +292,9 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
 
   def prepareComplete(updateTimestamp: Long): TxnTransitMetadata = {
     val newState = if (state == PrepareCommit) CompleteCommit else CompleteAbort
+
+    // Since the state change was successfully written to the log, unset the flag for a failed epoch fence
+    hasFailedEpochFence = false
     prepareTransitionTo(newState, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition],
       txnStartTimestamp, updateTimestamp)
   }
@@ -468,6 +479,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
       transactionalId == other.transactionalId &&
       producerId == other.producerId &&
       producerEpoch == other.producerEpoch &&
+      lastProducerEpoch == other.lastProducerEpoch &&
       txnTimeoutMs == other.txnTimeoutMs &&
       state.equals(other.state) &&
       topicPartitions.equals(other.topicPartitions) &&
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 5eb5319..13fafc4 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -662,6 +662,65 @@ class TransactionsTest extends KafkaServerTestHarness {
     }
   }
 
+  @Test
+  def testFailureToFenceEpoch(): Unit = {
+    val producer1 = transactionalProducers.head
+    val producer2 = createTransactionalProducer("transactional-producer", maxBlockMs = 1000)
+
+    producer1.initTransactions()
+
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
+    producer1.commitTransaction()
+
+    val partitionLeader = TestUtils.waitUntilLeaderIsKnown(servers, new TopicPartition(topic1, 0))
+    var producerStateEntry =
+      servers(partitionLeader).logManager.getLog(new TopicPartition(topic1, 0)).get.producerStateManager.activeProducers.head._2
+    val producerId = producerStateEntry.producerId
+    val initialProducerEpoch = producerStateEntry.producerEpoch
+
+    // Kill two brokers to bring the transaction log under min-ISR
+    killBroker(0)
+    killBroker(1)
+
+    try {
+      producer2.initTransactions()
+    } catch {
+      case _: TimeoutException =>
+        // good!
+      case e: Exception =>
+        fail("Got an unexpected exception from initTransactions", e)
+    } finally {
+      producer2.close()
+    }
+
+    restartDeadBrokers()
+
+    // Because the epoch was bumped in memory, attempting to begin a transaction with producer 1 should fail
+    try {
+      producer1.beginTransaction()
+    } catch {
+      case _: ProducerFencedException =>
+        // good!
+      case e: Exception =>
+        fail("Got an unexpected exception from commitTransaction", e)
+    } finally {
+      producer1.close()
+    }
+
+    val producer3 = createTransactionalProducer("transactional-producer", maxBlockMs = 5000)
+    producer3.initTransactions()
+
+    producer3.beginTransaction()
+    producer3.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
+    producer3.commitTransaction()
+
+    // Check that the epoch only increased by 1
+    producerStateEntry =
+      servers(partitionLeader).logManager.getLog(new TopicPartition(topic1, 0)).get.producerStateManager.activeProducers(producerId)
+    assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch)
+  }
+
   private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
                                                       start: Int, end: Int, willBeCommitted: Boolean): Unit = {
     for (i <- start until end) {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index d889097..2dbd8c4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -564,7 +564,7 @@ class TransactionCoordinatorTest {
       .anyTimes()
 
     val originalMetadata = new TransactionMetadata(transactionalId, producerId, producerId, (producerEpoch + 1).toShort,
-      producerEpoch, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds())
+      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds())
     EasyMock.expect(transactionManager.appendTransactionToLog(
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
@@ -615,6 +615,76 @@ class TransactionCoordinatorTest {
   }
 
   @Test
+  def shouldNotRepeatedlyBumpEpochDueToInitPidDuringOngoingTxnIfAppendToLogFails(): Unit = {
+    val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, producerEpoch,
+      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds())
+
+    EasyMock.expect(transactionManager.validateTransactionTimeoutMs(EasyMock.anyInt()))
+      .andReturn(true)
+      .anyTimes()
+
+    EasyMock.expect(transactionManager.putTransactionStateIfNotExists(EasyMock.anyObject[TransactionMetadata]()))
+      .andReturn(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))
+      .anyTimes()
+
+    EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
+      .andAnswer(() => Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+      .anyTimes()
+
+    val originalMetadata = new TransactionMetadata(transactionalId, producerId, producerId, (producerEpoch + 1).toShort,
+      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds())
+    val txnTransitMetadata = originalMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds())
+    EasyMock.expect(transactionManager.appendTransactionToLog(
+      EasyMock.eq(transactionalId),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(txnTransitMetadata),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject())
+    ).andAnswer(() => {
+      capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS)
+      txnMetadata.pendingState = None
+    }).times(2)
+
+    EasyMock.expect(transactionManager.appendTransactionToLog(
+      EasyMock.eq(transactionalId),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(txnTransitMetadata),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject())
+    ).andAnswer(() => {
+      capturedErrorsCallback.getValue.apply(Errors.NONE)
+
+      // For the successful call, execute the state transitions that would happen in appendTransactionToLog()
+      txnMetadata.completeTransitionTo(txnTransitMetadata)
+      txnMetadata.prepareComplete(time.milliseconds())
+    }).once()
+
+    EasyMock.replay(transactionManager)
+
+    // For the first two calls, verify that the epoch was only bumped once
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, None, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.NOT_ENOUGH_REPLICAS), result)
+
+    assertEquals((producerEpoch + 1).toShort, txnMetadata.producerEpoch)
+    assertTrue(txnMetadata.hasFailedEpochFence)
+
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, None, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.NOT_ENOUGH_REPLICAS), result)
+
+    assertEquals((producerEpoch + 1).toShort, txnMetadata.producerEpoch)
+    assertTrue(txnMetadata.hasFailedEpochFence)
+
+    // For the last, successful call, verify that the epoch was not bumped further
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, None, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
+
+    assertEquals((producerEpoch + 1).toShort, txnMetadata.producerEpoch)
+    assertFalse(txnMetadata.hasFailedEpochFence)
+
+    EasyMock.verify(transactionManager)
+  }
+
+  @Test
   def shouldUseLastEpochToFenceWhenEpochsAreExhausted(): Unit = {
     val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, (Short.MaxValue - 1).toShort,
       (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds())
@@ -629,7 +699,14 @@ class TransactionCoordinatorTest {
 
     EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
       .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
-      .anyTimes()
+      .times(2)
+
+    val postFenceTxnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, Short.MaxValue,
+      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, PrepareAbort, partitions, time.milliseconds(), time.milliseconds())
+
+    EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
+      .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, postFenceTxnMetadata))))
+      .once()
 
     EasyMock.expect(transactionManager.appendTransactionToLog(
       EasyMock.eq(transactionalId),
@@ -638,7 +715,7 @@ class TransactionCoordinatorTest {
         producerId = producerId,
         lastProducerId = producerId,
         producerEpoch = Short.MaxValue,
-        lastProducerEpoch = (Short.MaxValue - 1).toShort,
+        lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
         txnTimeoutMs = txnTimeoutMs,
         txnState = PrepareAbort,
         topicPartitions = partitions.toSet,
@@ -878,8 +955,8 @@ class TransactionCoordinatorTest {
       .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
       .times(2)
 
-    val expectedTransition = TxnTransitMetadata(producerId, producerId, (producerEpoch + 1).toShort, producerEpoch, txnTimeoutMs,
-      PrepareAbort, partitions.toSet, now, now + TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs)
+    val expectedTransition = TxnTransitMetadata(producerId, producerId, (producerEpoch + 1).toShort, RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs, PrepareAbort, partitions.toSet, now, now + TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs)
 
     EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
@@ -942,6 +1019,47 @@ class TransactionCoordinatorTest {
   }
 
   @Test
+  def shouldNotBumpEpochWhenAbortingExpiredTransactionIfAppendToLogFails(): Unit = {
+    val now = time.milliseconds()
+    val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, producerEpoch,
+      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, now, now)
+
+
+    EasyMock.expect(transactionManager.timedOutTransactions())
+      .andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, producerId, producerEpoch)))
+    EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
+      .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+      .times(2)
+
+    val txnMetadataAfterAppendFailure = new TransactionMetadata(transactionalId, producerId, producerId, (producerEpoch + 1).toShort,
+      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, now, now)
+    EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
+      .andAnswer(() => Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadataAfterAppendFailure))))
+      .once
+
+    val bumpedEpoch = (producerEpoch + 1).toShort
+    val expectedTransition = TxnTransitMetadata(producerId, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
+      PrepareAbort, partitions.toSet, now, now + TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs)
+
+    EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(expectedTransition),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject())
+    ).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS)).once()
+
+    EasyMock.replay(transactionManager, transactionMarkerChannelManager)
+
+    coordinator.startup(false)
+    time.sleep(TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs)
+    scheduler.tick()
+    EasyMock.verify(transactionManager)
+
+    assertEquals((producerEpoch + 1).toShort, txnMetadataAfterAppendFailure.producerEpoch)
+    assertTrue(txnMetadataAfterAppendFailure.hasFailedEpochFence)
+  }
+
+  @Test
   def shouldNotBumpEpochWithPendingTransaction(): Unit = {
     val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, producerEpoch,
       RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds())