You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/29 01:08:05 UTC

[GitHub] [kafka] jolshan opened a new pull request, #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

jolshan opened a new pull request, #12915:
URL: https://github.com/apache/kafka/pull/12915

   There were a few options here. Originally we should have bumped the client request version and only sent the response if the client could handle it (since this protocol is actually gated on IBP and not client version). Doing this now will require a KIP so I thought I'd get a simpler fix in temporarily.
   
   The client is modified to handle REQUEST_TIMED_OUT by reenquing the request. This means new clients can handle the current server code.
   
   The server is also modified to return CONCURRENT_TRANSACTIONS instead. This means the request will also be reenqued. This allows older clients to handle this controller channel timeout as well. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036589456


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @Test
+    public void testRetriableErrors() {

Review Comment:
   Just the ones we expect. So `REQUEST_TIMED_OUT`, `CONCURRENT_TRANSACTIONS` and any of the others we've replaced with the implicit `RetriableException` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035062033


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   I'm wondering if we should be even more lenient with the error codes and check for `RetriableException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12915:
URL: https://github.com/apache/kafka/pull/12915


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I think another way to do this is like this:
   ```java
   @EnumSource(names = { 
     "UNKNOWN_TOPIC_OR_PARTITION", 
     "REQUEST_TIMED_OUT", 
     "COORDINATOR_LOAD_IN_PROGRESS", 
     "CONCURRENT_TRANSACTIONS" 
   })
   public void testRetriableErrors(Errors error) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036369902


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
         val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
         if (block == null) {
-          throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block")
+          throw Errors.CONCURRENT_TRANSACTIONS.exception("Timed out waiting for next producer ID block")

Review Comment:
   Hmm -- I may have misunderstood some of our offline discussions. I believe a KIP is needed to bump the request version but since concurent transactions is already handled by the client for this request, my understanding is that it is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12915:
URL: https://github.com/apache/kafka/pull/12915#issuecomment-1339938684

   Did we want to cherry pick this back to older branches @hachikuji? Or wait for the server side version for that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038385341


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {

Review Comment:
   Oops, I added it to that test. I will revert that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036589456


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @Test
+    public void testRetriableErrors() {

Review Comment:
   Just the ones we expect. So `REQUEST_TIMED_OUT`, `CONCURRENT_TRANSACTIONS` and any of the others we've replaced with the indirect `RetriableException` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036368921


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.CONCURRENT_TRANSACTIONS || error.exception() instanceof RetriableException) {

Review Comment:
   Are you suggesting changing the Exception to retriable? I'm a bit concerned with such a large change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038367059


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {

Review Comment:
   Sorry, I realized this is not a valid error for `FindCoordinator`, so nevermind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035081487


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   Also, can we check the other handlers? We may as well update all of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036571472


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @Test
+    public void testRetriableErrors() {

Review Comment:
   Is it possible to parameterize this test case so that we can verify retriable errors other than REQUEST_TIMED_OUT (e.g. CONCURRENT_TRANSACTIONS).



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @Test
+    public void testRetriableErrors() {

Review Comment:
   Is it possible to parameterize this test case so that we can verify retriable errors other than REQUEST_TIMED_OUT (e.g. CONCURRENT_TRANSACTIONS)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036368921


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.CONCURRENT_TRANSACTIONS || error.exception() instanceof RetriableException) {

Review Comment:
   It is not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I think another way to do this is like this:
   ```java
   @EnumSource(names = { 
     "UNKNOWN_TOPIC_OR_PARTITION", 
     "REQUEST_TIMED_OUT", 
     "COORDINATOR_LOAD_IN_PROGRESS", 
     "CONCURRENT_TRANSACTIONS" 
   })
   ```



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {
+        // Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+    }
+

Review Comment:
   nit: extra new line



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {

Review Comment:
   Do we have another test for `NOT_COORDINATOR`? I wonder if we can parameterize this one as well and cover both cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038368961


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {

Review Comment:
   I suppose we have `testLookupCoordinatorOnNotCoordinatorError` Is this what you were thinking of?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036564976


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.CONCURRENT_TRANSACTIONS || error.exception() instanceof RetriableException) {

Review Comment:
   Hmm, I don't think there are any dependencies outside `TransactionManager`. It seems like a safe change? It _is_ a retriable error: we just haven't marked it as such explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035124199


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   Yeah, I'm suggesting that we check the timeout error code (or retriable errors generally) for all of the handlers. That protects the client from future changes on the broker if the error code is not used today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035309932


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   I've updated the handlers. Please take a look. I tried to make sure that the errors I replaced in the line were all retriable and all the ones below were not, but it would be good to have a second pair of eyes check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12915:
URL: https://github.com/apache/kafka/pull/12915#issuecomment-1331508810

   Test failures so far seem to be unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035110895


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   I think this is only for the init producer ID since that is the only request that also goes to the controller (the initProducerIdRequest now needs to make a second request to the controller)
   
   I can double check though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038365461


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I thought about this, but I didn't know how to make the errors from strings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036577992


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @Test
+    public void testRetriableErrors() {

Review Comment:
   We can, but do you want to test all of them? or just the ones that can be returned?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036359658


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
         val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
         if (block == null) {
-          throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block")
+          throw Errors.CONCURRENT_TRANSACTIONS.exception("Timed out waiting for next producer ID block")

Review Comment:
   Should we do this fix separately? I thought we had considered whether a short KIP would be needed. We probably also need to consider the behavior of other clients (such as rdkafka).



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error == Errors.CONCURRENT_TRANSACTIONS || error.exception() instanceof RetriableException) {

Review Comment:
   I think `ConcurrentTransactionsException` should be retriable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org