You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jolshan (via GitHub)" <gi...@apache.org> on 2023/04/14 22:27:50 UTC

[GitHub] [kafka] jolshan opened a new pull request, #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

jolshan opened a new pull request, #13579:
URL: https://github.com/apache/kafka/pull/13579

   [KAFKA-14561](https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36) added verification to transactional produce requests to confirm an ongoing transaction.
   
   There is an edge case where the transaction is added, but the coordinator is still writing the state to the log. In this case, when verifying, we return CONCURRENT_TRANSACTIONS and retry. However, the next inflight batch is often successful because the write completes. 
   
   When a partition has no entry in the PSM, it will allow any sequence number. This means if we retry the first write to the partition (or first write in a while) we will never be able to write it and get OutOfOrderSequence exceptions. This is a known issue. Since the verification makes this more common, I propose allowing verification on pending ongoing state. We will potentially have hanging transactions if the coordinator crashes before the writes complete, but this is better than endless out of order exceptions and is better than not verifying at all. (It is the best compromise)
   
   The good news is part 2 of KIP-890 will allow us to enforce that the first write for a transaction is sequence 0 and this issue will go away entirely.  
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1509969944

   We should also cherry-pick this to 3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1167627192


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -396,8 +397,14 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
           } else if (txnMetadata.producerEpoch != producerEpoch) {
             Left(Errors.PRODUCER_FENCED)
           } else if (txnMetadata.pendingTransitionInProgress) {
-            // return a retriable exception to let the client backoff and retry
-            Left(Errors.CONCURRENT_TRANSACTIONS)
+            // If we are in the produce path, we want to avoid OutOfOrderSequence errors if the added partition is pending.
+            // TODO: Part 2 of KIP-890 will always start transactions with sequence 0, so we enforce that and avoid this workaround.

Review Comment:
   We generally avoid `TODO` in the code - they always go stale and give the impression that something is not complete. The comment itself is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170680513


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification")
       responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
     } else {
-      val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-      
+      val result: ApiResult[Map[TopicPartition, Errors]] =
+        txnManager.getTransactionState(transactionalId).flatMap {
+          case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+
+            // generate the new transaction metadata with added partitions
+            txnMetadata.inLock {
+              if (txnMetadata.producerId != producerId) {
+                Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+              } else if (txnMetadata.producerEpoch != producerEpoch) {
+                Left(Errors.PRODUCER_FENCED)
+              } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+                // return a retriable exception to let the client backoff and retry
+                Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   Yeah, I think that's right. It works because we remove the partition after we have confirmed that the end transaction marker has been written. So if the partition is included, then it means markers are still to come. This assumes we fix https://issues.apache.org/jira/browse/KAFKA-14884 of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Pending state blocked verification of transactions

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1516620151

   Many many connect failures still 😔 
   Failures appear to be unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170696945


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification")
       responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
     } else {
-      val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-      
+      val result: ApiResult[Map[TopicPartition, Errors]] =
+        txnManager.getTransactionState(transactionalId).flatMap {
+          case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+
+            // generate the new transaction metadata with added partitions
+            txnMetadata.inLock {
+              if (txnMetadata.producerId != producerId) {
+                Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+              } else if (txnMetadata.producerEpoch != producerEpoch) {
+                Left(Errors.PRODUCER_FENCED)
+              } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
+                Left(Errors.CONCURRENT_TRANSACTIONS)
+              } else {
+                Right(partitions.map(part => 

Review Comment:
   nit: usually we would write `partitions.map { part =>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan merged pull request #13579: KAFKA-14904: Pending state blocked verification of transactions

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan merged PR #13579:
URL: https://github.com/apache/kafka/pull/13579


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1511592266

   It was about 1/40 locally on my laptop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1511739519

   Yes. I ran it 100 times locally and no failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1511617282

   > It was about 1/40 locally on my laptop.
   
   And now it passes consistently locally?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1168901227


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -396,8 +397,14 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
           } else if (txnMetadata.producerEpoch != producerEpoch) {
             Left(Errors.PRODUCER_FENCED)
           } else if (txnMetadata.pendingTransitionInProgress) {
-            // return a retriable exception to let the client backoff and retry
-            Left(Errors.CONCURRENT_TRANSACTIONS)
+            // If we are in the produce path, we want to avoid OutOfOrderSequence errors if the added partition is pending.
+            // TODO: Part 2 of KIP-890 will always start transactions with sequence 0, so we enforce that and avoid this workaround.

Review Comment:
   I can remove the "TODO" parts of the comments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1509935179

   @jolshan Thanks for the PR. Can you please share details regarding the frequency of failures before/after the change when you run this test locally in a loop?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Pending state blocked verification of transactions

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1516624342

   Will also cherry-pick to 3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170696945


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification")
       responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
     } else {
-      val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-      
+      val result: ApiResult[Map[TopicPartition, Errors]] =
+        txnManager.getTransactionState(transactionalId).flatMap {
+          case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+
+            // generate the new transaction metadata with added partitions
+            txnMetadata.inLock {
+              if (txnMetadata.producerId != producerId) {
+                Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+              } else if (txnMetadata.producerEpoch != producerEpoch) {
+                Left(Errors.PRODUCER_FENCED)
+              } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
+                Left(Errors.CONCURRENT_TRANSACTIONS)
+              } else {
+                Right(partitions.map(part => 

Review Comment:
   nit: usually we would write `partitions.map { part =>



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification")
       responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
     } else {
-      val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-      
+      val result: ApiResult[Map[TopicPartition, Errors]] =
+        txnManager.getTransactionState(transactionalId).flatMap {
+          case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+
+            // generate the new transaction metadata with added partitions

Review Comment:
   nit: this comment seems misplaced
   
   Could we have a short comment here that we intentionally do not check pending state? We can mention that partitions are removed from the transaction metadata as soon as the markers are confirmed written.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170670464


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification")
       responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
     } else {
-      val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-      
+      val result: ApiResult[Map[TopicPartition, Errors]] =
+        txnManager.getTransactionState(transactionalId).flatMap {
+          case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+
+            // generate the new transaction metadata with added partitions
+            txnMetadata.inLock {
+              if (txnMetadata.producerId != producerId) {
+                Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+              } else if (txnMetadata.producerEpoch != producerEpoch) {
+                Left(Errors.PRODUCER_FENCED)
+              } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+                // return a retriable exception to let the client backoff and retry
+                Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   Do we need this condition for verification case?  No matter what the pending state is, if the the state contains the partition, we're good, otherwise, we'd fail during verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170681401


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification")
       responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
     } else {
-      val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-      
+      val result: ApiResult[Map[TopicPartition, Errors]] =
+        txnManager.getTransactionState(transactionalId).flatMap {
+          case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+
+            // generate the new transaction metadata with added partitions
+            txnMetadata.inLock {
+              if (txnMetadata.producerId != producerId) {
+                Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+              } else if (txnMetadata.producerEpoch != producerEpoch) {
+                Left(Errors.PRODUCER_FENCED)
+              } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+                // return a retriable exception to let the client backoff and retry
+                Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   If we allow any pending states, we will have to catch the error on kafka-14884. Which I suppose is also ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170681193


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification")
       responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
     } else {
-      val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-      
+      val result: ApiResult[Map[TopicPartition, Errors]] =
+        txnManager.getTransactionState(transactionalId).flatMap {
+          case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+
+            // generate the new transaction metadata with added partitions
+            txnMetadata.inLock {
+              if (txnMetadata.producerId != producerId) {
+                Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+              } else if (txnMetadata.producerEpoch != producerEpoch) {
+                Left(Errors.PRODUCER_FENCED)
+              } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+                // return a retriable exception to let the client backoff and retry
+                Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   If we are pending commit or abort I don't know if it makes sense to verify and allow the write to continue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org