You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/09/13 21:22:06 UTC

[kafka] branch trunk updated: KAFKA-15459: Convert coordinator retriable errors to a known producer response error (#14378)

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5aecd282564 KAFKA-15459: Convert coordinator retriable errors to a known producer response error (#14378)
5aecd282564 is described below

commit 5aecd2825644728f68a26558c957f5dfd4643423
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Wed Sep 13 14:21:58 2023 -0700

    KAFKA-15459: Convert coordinator retriable errors to a known producer response error (#14378)
    
    KIP-890 Part 1 tries to address hanging transactions on old clients. Thus, the produce version can not be bumped and no new errors can be added. Before we used the java client's notion of retriable and abortable errors -- retriable errors are defined as such by extending the retriable error class, fatal errors are defined explicitly, and abortable errors are the remaining. However, many other clients treat non specified errors as fatal and that means many retriable errors kill the app [...]
    
    Stuck between having specific errors for Java clients that are handled correctly (ie we retry) or specific fatal errors for cases that should not be fatal, we opted for a middle ground of non-specific error, but a message in the response to specify.
    
    Converting some of the coordinator error codes to NOT_ENOUGH_REPLICAS which is a known produce response.
    Also correctly add the old errors to the produce response. (We were not doing this correctly before)
    
    Added tests for the new errors and messages.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, David Jacot <dj...@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   | 111 ++++++++++++---------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  69 ++++++++++++-
 2 files changed, 130 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f92e3717361..73dd527ed16 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -753,39 +753,56 @@ class ReplicaManager(val config: KafkaConfig,
         val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
           origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap)
         debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
-        
-        val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) =>
-          val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message()
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception(message))
-          )
-        }
 
-        val errorResults = errorsPerPartition.map { case (topicPartition, error) =>
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception())
-          )
+        def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult],
+                                useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = {
+          appendResult.map { case (topicPartition, result) =>
+            topicPartition -> ProducePartitionStatus(
+              result.info.lastOffset + 1, // required offset
+              new PartitionResponse(
+                result.error,
+                result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+                result.info.lastOffset,
+                result.info.logAppendTime,
+                result.info.logStartOffset,
+                result.info.recordErrors,
+                if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage
+              )
+            ) // response status
+          }
         }
         
-        val allResults = localProduceResults ++ unverifiedResults ++ errorResults
+        val unverifiedResults = unverifiedEntries.map {
+          case (topicPartition, error) =>
+            val finalException =
+              error match {
+                case Errors.INVALID_TXN_STATE => error.exception("Partition was not added to the transaction")
+                case Errors.CONCURRENT_TRANSACTIONS |
+                     Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                     Errors.COORDINATOR_NOT_AVAILABLE |
+                     Errors.NOT_COORDINATOR => new NotEnoughReplicasException(
+                         s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")
+                case _ => error.exception()
+            }
+            topicPartition -> LogAppendResult(
+              LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+              Some(finalException)
+            )
+        }
 
-        val produceStatus = allResults.map { case (topicPartition, result) =>
-          topicPartition -> ProducePartitionStatus(
-            result.info.lastOffset + 1, // required offset
-            new PartitionResponse(
-              result.error,
-              result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
-              result.info.lastOffset,
-              result.info.logAppendTime,
-              result.info.logStartOffset,
-              result.info.recordErrors,
-              result.info.errorMessage
+        val errorResults = errorsPerPartition.map {
+          case (topicPartition, error) =>
+            topicPartition -> LogAppendResult(
+              LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+              Some(error.exception())
             )
-          ) // response status
         }
 
+        val produceStatus = Set((localProduceResults, false), (unverifiedResults, true), (errorResults, false)).flatMap {
+          case (results, useCustomError) => produceStatusResult(results, useCustomError)
+        }.toMap
+        val allResults = localProduceResults ++ unverifiedResults ++ errorResults
+
         actionQueue.add {
           () => allResults.foreach { case (topicPartition, result) =>
             val requestKey = TopicPartitionOperationKey(topicPartition)
@@ -832,28 +849,30 @@ class ReplicaManager(val config: KafkaConfig,
         val (error, node) = getTransactionCoordinator(transactionStatePartition.get)
 
         if (error != Errors.NONE) {
-          throw error.exception() // Can throw coordinator not available -- which is retriable
-        }
+          appendEntries(entriesPerPartition)(notYetVerifiedEntriesPerPartition.map {
+            case (tp, _) => (tp, error)
+          }.toMap)
+        } else {
+          val topicGrouping = notYetVerifiedEntriesPerPartition.keySet.groupBy(tp => tp.topic())
+          val topicCollection = new AddPartitionsToTxnTopicCollection()
+          topicGrouping.foreach { case (topic, tps) =>
+            topicCollection.add(new AddPartitionsToTxnTopic()
+              .setName(topic)
+              .setPartitions(tps.map(tp => Integer.valueOf(tp.partition())).toList.asJava))
+          }
 
-        val topicGrouping = notYetVerifiedEntriesPerPartition.keySet.groupBy(tp => tp.topic())
-        val topicCollection = new AddPartitionsToTxnTopicCollection()
-        topicGrouping.foreach { case (topic, tps) =>
-          topicCollection.add(new AddPartitionsToTxnTopic()
-            .setName(topic)
-            .setPartitions(tps.map(tp => Integer.valueOf(tp.partition())).toList.asJava))
+          // Map not yet verified partitions to a request object.
+          // We verify above that all partitions use the same producer ID.
+          val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch()
+          val notYetVerifiedTransaction = new AddPartitionsToTxnTransaction()
+            .setTransactionalId(transactionalId)
+            .setProducerId(batchInfo.producerId())
+            .setProducerEpoch(batchInfo.producerEpoch())
+            .setVerifyOnly(true)
+            .setTopics(topicCollection)
+
+          addPartitionsToTxnManager.foreach(_.addTxnData(node, notYetVerifiedTransaction, KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))))
         }
-
-        // Map not yet verified partitions to a request object.
-        // We verify above that all partitions use the same producer ID.
-        val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch()
-        val notYetVerifiedTransaction = new AddPartitionsToTxnTransaction()
-          .setTransactionalId(transactionalId)
-          .setProducerId(batchInfo.producerId())
-          .setProducerEpoch(batchInfo.producerEpoch())
-          .setVerifyOnly(true)
-          .setTopics(topicCollection)
-
-        addPartitionsToTxnManager.foreach(_.addTxnData(node, notYetVerifiedTransaction, KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))))
       }
     } else {
       // If required.acks is outside accepted range, something is wrong with the client
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b4bc4f540ad..6f913208320 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -64,7 +64,7 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, Fetc
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 import com.yammer.metrics.core.Gauge
 import kafka.log.remote.RemoteLogManager
 import org.apache.kafka.common.config.AbstractConfig
@@ -2287,8 +2287,8 @@ class ReplicaManagerTest {
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
-      callback(Map(tp0 -> Errors.NOT_COORDINATOR).toMap)
-      assertEquals(Errors.NOT_COORDINATOR, result.assertFired.error)
+      callback(Map(tp0 -> Errors.INVALID_PRODUCER_ID_MAPPING).toMap)
+      assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, result.assertFired.error)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
 
       // Try to append a higher sequence (7) after the first one failed with a retriable error.
@@ -2537,7 +2537,9 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = metadataCache,
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager)
+      alterPartitionManager = alterPartitionManager,
+      addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
+    )
 
     try {
       val txnCoordinatorPartition0 = 0
@@ -2569,6 +2571,65 @@ class ReplicaManagerTest {
 
       assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
       assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+      // Test we convert the error correctly when trying to append and coordinator is not available
+      val tp0 = new TopicPartition(topic, 0)
+      val producerId = 24L
+      val producerEpoch = 0.toShort
+      val sequence = 0
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1))
+      val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error: ${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedError, result.assertFired.errorMessage)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE"))
+  def testVerificationErrorConversions(error: Errors): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 0
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // Start verification and return the coordinator related errors.
+      val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0))
+      val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+      // Confirm we did not write to the log and instead returned the converted error with the correct error message.
+      val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
+      callback(Map(tp0 -> error).toMap)
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedMessage, result.assertFired.errorMessage)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }