You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jolshan (via GitHub)" <gi...@apache.org> on 2023/05/26 21:38:37 UTC

[GitHub] [kafka] jolshan opened a new pull request, #13769: MINOR: Covering all epoch cases in add partitions to txn manager

jolshan opened a new pull request, #13769:
URL: https://github.com/apache/kafka/pull/13769

   Originally part of https://github.com/apache/kafka/pull/13608, Artem made a good point that this change was unrelated, so I'm making a minor PR to cover it.
   
   Cleaning up the AddPartitionsToTxnManager and covering the 3 epoch cases more clearly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13769:
URL: https://github.com/apache/kafka/pull/13769#issuecomment-1570888291

   Test failures appear to be unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13769:
URL: https://github.com/apache/kafka/pull/13769#discussion_r1213389903


##########
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##########
@@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest {
 
     val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]()
     val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]()
+    val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, Errors]()

Review Comment:
   We are adding transaction 1 again (retry cases) once with old epoch, once with new epoch, and again with an older epoch.
   
   I can rename and add comments to make this clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13769:
URL: https://github.com/apache/kafka/pull/13769#discussion_r1213389003


##########
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##########
@@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest {
 
     val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]()
     val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]()
+    val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, Errors]()
+
     // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data and send a retriable response.
     addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1AgainErrorsOldEpoch))
     val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap
     assertEquals(expectedNetworkErrors, transaction1Errors)

Review Comment:
   I never populate those fields. 
   We either populate the field via the callback when we return early due to the epoch/retry case or we return after receiving the response. Since this test is just checking about the addTxnData path, I didn't mock receiving a response. Because of that, the callback is never executed and the field is not populated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan merged pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan merged PR #13769:
URL: https://github.com/apache/kafka/pull/13769


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13769:
URL: https://github.com/apache/kafka/pull/13769#issuecomment-1572734605

   Tests look unrelated. Will merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13769:
URL: https://github.com/apache/kafka/pull/13769#discussion_r1213186879


##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -36,45 +36,58 @@ class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransac
                                   val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback])
 
 
-class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) 
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time)
   extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) {
-  
+
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]()
-  
+
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
     nodesToTransactions.synchronized {
       // Check if we have already have either node or individual transaction. Add the Node if it isn't there.
-      val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
+      val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
         new TransactionDataAndCallbacks(
           new AddPartitionsToTxnTransactionCollection(1),
           mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-      val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
-
-      // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and 
-      // reconnected so return the retriable network exception.
-      if (currentTransactionData != null) {
-        val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch())
-          Errors.INVALID_PRODUCER_EPOCH
-        else 
-          Errors.NETWORK_EXCEPTION
-        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-        currentTransactionData.topics().forEach { topic =>
-          topic.partitions().forEach { partition =>
-            topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error)
-          }
+      val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+      // There are 3 cases if we already have existing data
+      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced
+      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception
+      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify
+      if (existingTransactionData != null) {
+        if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) {

Review Comment:
   nit: `()` can be omitted. There are a few other case below.



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -36,45 +36,58 @@ class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransac
                                   val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback])
 
 
-class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) 
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time)
   extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) {
-  
+
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]()
-  
+
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
     nodesToTransactions.synchronized {
       // Check if we have already have either node or individual transaction. Add the Node if it isn't there.
-      val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
+      val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
         new TransactionDataAndCallbacks(
           new AddPartitionsToTxnTransactionCollection(1),
           mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-      val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
-
-      // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and 
-      // reconnected so return the retriable network exception.
-      if (currentTransactionData != null) {
-        val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch())
-          Errors.INVALID_PRODUCER_EPOCH
-        else 
-          Errors.NETWORK_EXCEPTION
-        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-        currentTransactionData.topics().forEach { topic =>
-          topic.partitions().forEach { partition =>
-            topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error)
-          }
+      val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+      // There are 3 cases if we already have existing data
+      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced
+      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception
+      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify
+      if (existingTransactionData != null) {
+        if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) {
+          val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch())
+            Errors.INVALID_PRODUCER_EPOCH
+          else
+            Errors.NETWORK_EXCEPTION
+          val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+          existingNodeAndTransactionData.transactionData.remove(transactionData)
+          oldCallback(topicPartitionsToError(existingTransactionData, error))
+        } else {
+          // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately.
+          callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH))
+          return
         }
-        val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-        currentNodeAndTransactionData.transactionData.remove(transactionData)
-        oldCallback(topicPartitionsToError.toMap)
       }
-      currentNodeAndTransactionData.transactionData.add(transactionData)
-      currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback)
+
+      existingNodeAndTransactionData.transactionData.add(transactionData)
+      existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback)
       wakeup()
     }
   }
 
+  private def topicPartitionsToError(transactionData: AddPartitionsToTxnTransaction, error: Errors): Map[TopicPartition, Errors] = {
+    val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+    transactionData.topics().forEach { topic =>
+      topic.partitions().forEach { partition =>
+        topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error)
+      }
+    }

Review Comment:
   ditto about `()`.



##########
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##########
@@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest {
 
     val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]()
     val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]()
+    val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, Errors]()

Review Comment:
   Hmm... Again again :). It is hard to figure out what this does based on the name.



##########
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##########
@@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest {
 
     val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]()
     val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]()
+    val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, Errors]()
+
     // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data and send a retriable response.
     addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1AgainErrorsOldEpoch))
     val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap
     assertEquals(expectedNetworkErrors, transaction1Errors)

Review Comment:
   I am curious here. Is there a reason why we never verify `transaction2Errors` and `transaction3Errors`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org