You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/11 20:58:08 UTC

[GitHub] [kafka] philipnee opened a new pull request, #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

philipnee opened a new pull request, #12149:
URL: https://github.com/apache/kafka/pull/12149

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171873787


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-                fatalError(error.exception());
+                log.info("Abortable authorization error: {}.  Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR);
+                lastError = error.exception();
+                epochBumpRequired = true;

Review Comment:
   even i think the initProducerId path in the sender has been tested, but it's probably worth adding a sender test to make sure the epoch is bumped from -1 to 0. I think it should be an easy modification, 👀 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178259026


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -328,6 +333,21 @@ void runOnce() {
         client.poll(pollTimeout, currentTimeMs);
     }
 
+    // We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first
+    // failing the inflight requests, then transition the state to UNINITIALIZED so that the user doesn't need to
+    // instantiate the producer again.
+    private boolean shouldHandleAuthorizationError(RuntimeException exception) {

Review Comment:
   it seems like all of the non-initProducerId TransactionalIdAuthorizationException and ClusterAuthorizationException are fatal.
   
   For the poll: I think we don't need it because there's no outbound request, as it should've been already polled in the previous `runOnce`.  The tests seem to work without so i'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166059935


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -607,6 +608,14 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon
         removeInFlightBatch(batch);
     }
 
+    public synchronized void transitionToUninitialized(RuntimeException exception) {
+        transitionTo(State.UNINITIALIZED);
+        lastError = null;
+        if (pendingTransition != null) {

Review Comment:
   Do you have more background on what pendingTransitions we typically have? I don't think this hurts unless it will cause a fatal error. If it does, we may need to take a closer look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1168861947


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-                fatalError(error.exception());
+                log.info("Abortable authorization error: {}.  Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR);
+                lastError = error.exception();
+                epochBumpRequired = true;

Review Comment:
   hey sorry - i'm in progress of writing a test here 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r900524288


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1292,8 +1296,12 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-                    error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+            } else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+                log.error("Cluster authorization failure, transition the producer state back to uninitialized");
+                fail(error.exception());
+                // transition back to uninitialized state to enable client side retry
+                transitionTo(State.UNINITIALIZED);
+            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {

Review Comment:
   I wonder if we should handle this case consistently. I cannot think of any reason not to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113612843


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Can you point to me where you saw this? I think abortable error was something we had before that didn't need this extra step. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171904037


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-                fatalError(error.exception());
+                log.info("Abortable authorization error: {}.  Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR);
+                lastError = error.exception();
+                epochBumpRequired = true;

Review Comment:
   @jolshan  - See the last commit.  I cleaned up the comments because authorization isn't a fatal error after the patch.  I also patched a test to demonstrate resend after fixing the permission should bring the producer back to normal. Also tested the epoch there to ensure it is 0, not -1 after fixing the permission.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171912717


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2175,7 +2182,7 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception
         sender.runOnce();
         assertFutureFailure(future, ClusterAuthorizationException.class);
 
-        // cluster authorization errors are fatal, so we should continue seeing it on future sends
+        // expecting to continue to see authorization error until user permission is fixed
         assertTrue(transactionManager.hasFatalError());

Review Comment:
   Should this still be true? Do we remove it from fatal error state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r874042264


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1292,8 +1292,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-                    error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+            } else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+                log.warn("Retrying upon missing initProducerId due to cluster authorization error");
+                reenqueue();

Review Comment:
   Hmm, I think we do want the error to get propagated back to the application. Most users would expect the application to fail so that they have a chance to see the issue and fix it. High level, what I was thinking we could do is fail the request by calling `result.fail`, then transition back to `UNINITIALIZED`. Then if the user wants to, they can call `initTransactions()` to retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167247268


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-                fatalError(error.exception());
+                log.info("Abortable authorization error: {}.  Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR);
+                lastError = error.exception();
+                epochBumpRequired = true;

Review Comment:
   can we confirm this works as intended via test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan merged pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan merged PR #12149:
URL: https://github.com/apache/kafka/pull/12149


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167110290


##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
         }
     }
 
+    @Test
+    public void testClusterAuthorizationFailure() throws Exception {
+        int maxBlockMs = 500;
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
+        Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time);
+        assertThrows(ClusterAuthorizationException.class, producer::initTransactions);
+
+        // retry initTransactions after the ClusterAuthorizationException not being thrown
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+        TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions);

Review Comment:
   right, it retries initTransaction until successful or timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180625792


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;
+                }
+

Review Comment:
   sounds good - perhaps you meant by checking for fatal first then abortable error? Though I think it doesn't change the logic there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004948547


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Do we know if there are other implications for this state machine change? It seems sort of like we want to just make the authorization error retriable -- but instead we are manking it abortable + making this specific abortable error reset the producer state.
   
   I'm not an expert in this area, but are there any other implications with re-initializing the producer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007181238


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config,
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
-     *         transactional.id is not authorized. See the exception for more details
+     *         transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for

Review Comment:
   One could get Authorization issue upon requesting for a producerId, not necessarily caused by an unavailable broker. Yes, maybe remove the word "fatal" thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1516893617

   Failing tests don't seem to be related.
   
   ```
   Build / JDK 8 and Scala 2.12 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 46s
   Build / JDK 8 and Scala 2.12 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 26s
   Build / JDK 8 and Scala 2.12 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 46s
   Build / JDK 11 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   3m 15s
   Build / JDK 11 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 3s
   Build / JDK 11 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 41s
   Build / JDK 11 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 23s
   Build / JDK 11 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 55s
   Build / JDK 11 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 7s
   Build / JDK 11 and Scala 2.13 / testWithGroupMetadata() – kafka.api.TransactionsBounceTest
   1m 22s
   Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateAndManyTopics() – kafka.server.KRaftClusterTest
   18s
   Build / JDK 17 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 14s
   Build / JDK 17 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 22s
   Build / JDK 17 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 16s
   Build / JDK 17 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 16s
   Build / JDK 17 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 53s
   Build / JDK 17 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 13s
   Build / JDK 17 and Scala 2.13 / testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   1m 14s
   Existing failures - 3
   Build / JDK 8 and Scala 2.12 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
   <1s
   Build / JDK 11 and Scala 2.13 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
   <1s
   Build / JDK 17 and Scala 2.13 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166062401


##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
         }
     }
 
+    @Test
+    public void testClusterAuthorizationFailure() throws Exception {
+        int maxBlockMs = 500;
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
+        Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time);
+        assertThrows(ClusterAuthorizationException.class, producer::initTransactions);
+
+        // retry initTransactions after the ClusterAuthorizationException not being thrown
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+        TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions);

Review Comment:
   Are we fully validating the request is successful? Is that what the TestUtils method does here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178260025


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -328,6 +333,21 @@ void runOnce() {
         client.poll(pollTimeout, currentTimeMs);
     }
 
+    // We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first
+    // failing the inflight requests, then transition the state to UNINITIALIZED so that the user doesn't need to
+    // instantiate the producer again.
+    private boolean shouldHandleAuthorizationError(RuntimeException exception) {

Review Comment:
   I might have added it there because a few lines down, in the fatal error handling, the client was polled before return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1156718718

   @philipnee where are we with this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1156747595

   @ijuma - I think @hachikuji is reviewing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004739130


##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -2236,6 +2236,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertFalse(response.clusterId.isEmpty, "Cluster id not returned")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)

Review Comment:
   nit: Indentation is off for this entire test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004948547


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Do we know if there are other implications for this state machine change? It seems sort of like we want to just make the authorization error retriable -- but instead we are manking it abortable + making this specific abortable error reset to "uninitialized"
   
   I'm not an expert in this area, but are there any other implications with re-initializing the producer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007185990


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   It's one way (that I could think of) to propagate the authorization error from the sender loop; otherwise, the sender will continue to retry and causes timeout in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166058138


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   Did we determine that we will end up bumping the epoch? It doesn't look like it is done here so is it somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672146


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;
+                }
+

Review Comment:
   Yup -- my concern is we unecessarily reset the producer to initializing in the fatal error case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171947129


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2175,7 +2182,7 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception
         sender.runOnce();
         assertFutureFailure(future, ClusterAuthorizationException.class);
 
-        // cluster authorization errors are fatal, so we should continue seeing it on future sends
+        // expecting to continue to see authorization error until user permission is fixed
         assertTrue(transactionManager.hasFatalError());

Review Comment:
   Ah I also missed. Thanks for correcting
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1086981346


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   Sorry -- I think if we have an error we'd still want to bump the epoch right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r902836728


##########
clients/src/test/resources/log4j.properties:
##########
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=OFF, stdout
+log4j.rootLogger=ALL, stdout

Review Comment:
   Probably added by mistake?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -627,6 +627,9 @@ private TransactionManager configureTransactionState(ProducerConfig config,
      * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
      * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
      * @throws InterruptException if the thread is interrupted while blocked
+     * @throws org.apache.kafka.common.errors.ClusterAuthorizationException fatal error indicating that the idempotent

Review Comment:
   I think we can get rid of this since we have `AuthorizationException` above. Perhaps we can add the comment about retrying there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007187187


##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
         }
     }
 
+    @Test
+    public void testClusterAuthorizationFailure() throws Exception {
+        int maxBlockMs = 500;
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
+        Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time);
+        assertThrows(ClusterAuthorizationException.class, producer::initTransactions);
+
+        // retry initTransactions after the ClusterAuthorizationException not being thrown
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+        TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions);

Review Comment:
   yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1184293956


##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
         }
     }
 
+    @Test
+    public void testClusterAuthorizationFailure() throws Exception {
+        int maxBlockMs = 500;
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
+        Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time);
+        assertThrows(ClusterAuthorizationException.class, producer::initTransactions);
+
+        // retry initTransactions after the ClusterAuthorizationException not being thrown
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+        TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions);

Review Comment:
   Probably - it's just done very consistently in the tests 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1533772132

   Thanks Philip. The initialization ones come when there is a thread leak somewhere. I'm pretty convinced it wasn't your change, but just wanted to be extra safe. I will check the next build. Thanks for your patience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166059149


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Ack -- I wonder if there are any other location where we would not want this transition. I can't think of off the top of my head, but I do think we need to be careful to clean the state properly in these cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167246564


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -607,6 +608,14 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon
         removeInFlightBatch(batch);
     }
 
+    public synchronized void transitionToUninitialized(RuntimeException exception) {
+        transitionTo(State.UNINITIALIZED);
+        lastError = null;
+        if (pendingTransition != null) {

Review Comment:
   I'd prefer you confirm that it's fine. Looking at the code -- it seems to set the TransactionalRequestResult state to the error and count down the latch. Given that it is for the request -- looks to be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171757259


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   Hey @jolshan - I actually think we don't need to manually bump the epoch here.  I think it is already handled by the existing logic.  Here's the explanation. I think initProducerId is only happening at 2 different places: 1. `initializeTransactions` and 2. in the sender loop for idempotent producer `bumpIdempotentEpochAndResetIdIfNeeded`.
   
   for 1. It will bump the epoch if the epoch != None, which means the producer has been initialized and needs to bump the epoch upon re-requesting the id per your comment
   
   for 2. it is when we first initialize a producer (so it doesn't have an id at the beginning), and the `InitProducerIdRequest` should bump the epoch to 0 upon first successful attempt.  This is the case we are addressing in this PR I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171913050


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2189,7 +2196,7 @@ public void testCancelInFlightRequestAfterFatalError() throws Exception {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        // cluster authorization is a fatal error for the producer
+        // expecting authorization error on send

Review Comment:
   ditto here -- should we change this test to use a fatal error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r900530199


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1292,8 +1296,12 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-                    error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+            } else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+                log.error("Cluster authorization failure, transition the producer state back to uninitialized");
+                fail(error.exception());
+                // transition back to uninitialized state to enable client side retry
+                transitionTo(State.UNINITIALIZED);
+            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {

Review Comment:
   yeah that sounds right. it seems like you they should just configure the ACL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004941219


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config,
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
-     *         transactional.id is not authorized. See the exception for more details
+     *         transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for

Review Comment:
   What do we mean by idempotent producer ID is unavailable? Is it that the broker is unavailable? And do we return this error if the broker is not available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1086981869


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   (Assuming this is an indempotent producer and not a transactional one)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171872077


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-                fatalError(error.exception());
+                log.info("Abortable authorization error: {}.  Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR);
+                lastError = error.exception();
+                epochBumpRequired = true;

Review Comment:
   Was about to ask about this and realized we removed the bump. I guess we could still have a test unless we think the other testing covers this end to end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178215576


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -328,6 +333,21 @@ void runOnce() {
         client.poll(pollTimeout, currentTimeMs);
     }
 
+    // We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first
+    // failing the inflight requests, then transition the state to UNINITIALIZED so that the user doesn't need to
+    // instantiate the producer again.
+    private boolean shouldHandleAuthorizationError(RuntimeException exception) {

Review Comment:
   Just curious -- if we get an auth error on another request (ie, not initProducerId) do we expect to start over by initializing with a new ID? 
   
   Also what is the goal with the poll call? Is it just replacing line 308? Would the code work without it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1523826079

   Left a few more questions -- I think we are in the final stretch here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113642114


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   Right this is an idempotent producer, thanks for catching the epoch bump, I was wondering about that too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1532302560

   thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1072654169


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   I'm thinking not, because we aren't adding a new producer.  @jolshan thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   hmm good point, I guess upon re-initializing (transition from UNINITIALIZED to INITIALIZING state), should we check the previous error to ensure a valid transition? Maybe in `initializeTransactions` we examine the previous error and make the next transition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167230503


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -607,6 +608,14 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon
         removeInFlightBatch(batch);
     }
 
+    public synchronized void transitionToUninitialized(RuntimeException exception) {
+        transitionTo(State.UNINITIALIZED);
+        lastError = null;
+        if (pendingTransition != null) {

Review Comment:
   thanks, i think it's fine to leave it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171946214


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2175,7 +2182,7 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception
         sender.runOnce();
         assertFutureFailure(future, ClusterAuthorizationException.class);
 
-        // cluster authorization errors are fatal, so we should continue seeing it on future sends
+        // expecting to continue to see authorization error until user permission is fixed
         assertTrue(transactionManager.hasFatalError());

Review Comment:
   I think that's actually my mistake.  It's a produce error not initProducerId.  I'll correct these issues. Sorry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1523820442

   Going to rerun the build one more time.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171818667


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config,
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
-     *         transactional.id is not authorized. See the exception for more details
+     *         transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for

Review Comment:
   Can we remove fatal here still?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1516893160

   Hey @jolshan - Thanks for the review.  i reverted those documentation/comment changes in the senderTest.java (for the produceResponse authorization error).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1533354929

   There seem to be a bit of server related failures, but they are also irrelevant to this change I think:
   
   I believe JDK17 tests and most of the JDK11 tests passed.
   
   ```
   Build / JDK 11 and Scala 2.13 / testOffsetSyncsTopicsOnTarget() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   3m 24s
   Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   1m 51s
   Build / JDK 8 and Scala 2.12 / testLogCleanerConfig(String).quorum=kraft – kafka.server.DynamicBrokerReconfigurationTest
   28s
   Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest
   2m 7s
   Existing failures - 23
   Build / JDK 8 and Scala 2.12 / executionError – kafka.server.DynamicBrokerReconfigurationTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.EdgeCaseRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FetchRequestDownConversionConfigTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FetchRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FetchRequestWithLegacyMessageFormatTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FinalizedFeatureChangeListenerTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.GssapiAuthenticationTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaMetricReporterClusterIdTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaMetricReporterExceptionHandlingTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaMetricsReporterTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaServerTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.ListOffsetsRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.LogDirFailureTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.LogRecoveryTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.MetadataRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.MultipleListenersWithAdditionalJaasContextTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.OffsetFetchRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.ServerGenerateClusterIdTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.ServerShutdownTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.UpdateFeaturesTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceWithIbp26Test
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.zk.KafkaZkClientTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – kafka.zk.ZkMigrationClientTest
   <1s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1184283663


##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
         }
     }
 
+    @Test
+    public void testClusterAuthorizationFailure() throws Exception {
+        int maxBlockMs = 500;
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
+        Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time);
+        assertThrows(ClusterAuthorizationException.class, producer::initTransactions);
+
+        // retry initTransactions after the ClusterAuthorizationException not being thrown
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+        TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions);

Review Comment:
   Should we close the producer here? Just looking through the failed tests and wanted to close any gaps we may have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672745


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;
+                }
+

Review Comment:
   Ah I see we had the abortable error check. 😅 Ok well now we are doubly covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r980725833


##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1237,6 +1238,35 @@ public void testInitTransactionWhileThrottled() {
         }
     }
 
+    @Test
+    public void testClusterAuthorizationFailure() throws Exception {
+        int maxBlockMs = 500;
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
+            assertThrows(ClusterAuthorizationException.class, () -> producer.initTransactions());
+            // retry initTransactions after the ClusterAuthorizationException not being thrown
+            client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+            producer.initTransactions();

Review Comment:
   hmm... consistently having issue to get a Errors.NONE response.  I can sometimes get a ClusterAuthorizationException.  Not sure why is it



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -607,6 +608,14 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon
         removeInFlightBatch(batch);
     }
 
+    public synchronized void transitionToUninitialized(RuntimeException exception) {
+        transitionTo(State.UNINITIALIZED);
+        lastError = null;
+        if (pendingTransition != null) {

Review Comment:
   I'm actually not sure if this is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
kirktrue commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1003656846


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -996,20 +1010,21 @@ private void ensureTransactional() {
     }
 
     private void maybeFailWithError() {

Review Comment:
   Are there any functional changes in `maybeFailWithError`, or is it just to refactor for an early return?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   In the new code we return in the case of a `ClusterAuthorizationException`. Per the comments later on in the `runOnce` method, do we still want to call the `bumpIdempotentEpochAndResetIdIfNeeded()` method in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004955636


##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
         }
     }
 
+    @Test
+    public void testClusterAuthorizationFailure() throws Exception {
+        int maxBlockMs = 500;
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
+        Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time);
+        assertThrows(ClusterAuthorizationException.class, producer::initTransactions);
+
+        // retry initTransactions after the ClusterAuthorizationException not being thrown
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+        TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions);

Review Comment:
   To be clear -- since I couldn't tell fully from the description -- before this change, trying to call initTransactions here would fail since the producer hits a fatal state. But with this change -- we don't retry initProducerId but instead keep the producer alive and basically re-initialize it so subsequent initTransactions calls can be made (and succeed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004958467


##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -2236,6 +2236,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertFalse(response.clusterId.isEmpty, "Cluster id not returned")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testRetryProducerInitializationAfterPermissionFix(quorum: String): Unit = {
+    createTopicWithBrokerPrincipal(topic)
+    val wildcard = new ResourcePattern(TOPIC, ResourcePattern.WILDCARD_RESOURCE, LITERAL)
+    val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED)
+    val literal = new ResourcePattern(TOPIC, topic, LITERAL)
+    val allowWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)
+    val denyWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, DENY)
+    val producer = buildIdempotentProducer()
+
+    addAndVerifyAcls(Set(denyWriteAce), wildcard)
+    assertThrows(classOf[Exception], () => {
+      val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes))
+      future.get()
+    })
+    removeAndVerifyAcls(Set(denyWriteAce), wildcard)
+    addAndVerifyAcls(Set(allowWriteAce), prefixed)
+    addAndVerifyAcls(Set(allowWriteAce), literal)
+    val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes))

Review Comment:
   Ah this test makes the benefit a bit more clear to me -- a subsequent send call works just fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167231804


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-                fatalError(error.exception());
+                log.info("Abortable authorization error: {}.  Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR);
+                lastError = error.exception();
+                epochBumpRequired = true;

Review Comment:
   bumping epoch here - I think sender should bump it on the next iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171855134


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config,
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
-     *         transactional.id is not authorized. See the exception for more details
+     *         transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for

Review Comment:
   🤦 - sorry and done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1532288964

   Still an issue -- rebuilding again 😅 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180619843


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;
+                }
+

Review Comment:
   I thought I left this comment, but seems like it didn't take -- should we move the fatal error check above the auth check? Since for other request types, we will go through the above path, but it really should just be a fatal error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007207887


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Hmmm. I see. I'm just wondering if we are hijacking the current state machine in an unexpected way and if there are implications there. I suppose we are only following this path on these specific error types, but I wonder if we are missing anything existing by changing the valid transitions and/or opening up the potential for something else in the future. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004752898


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -996,20 +1010,21 @@ private void ensureTransactional() {
     }
 
     private void maybeFailWithError() {

Review Comment:
   no change, i just refactored it a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004941219


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config,
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
-     *         transactional.id is not authorized. See the exception for more details
+     *         transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for

Review Comment:
   What do we mean by idempotent producer ID is unavailable? Is it that the broker hosting the transaction coordinator is unavailable? And do we return this error if the broker is not available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004944699


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config,
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
-     *         transactional.id is not authorized. See the exception for more details
+     *         transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for

Review Comment:
   Should we also change this to not say "fatal error"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1086992753


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Maybe I'm missing something here, but from "abortable_error" don't we typically abort the transaction and then return to initializing? Sorry if you've explained this before, but why did we choose uninitialized here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113611801


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   I think you will need to reinstantiate the producer if it is in an aborting state. I think here i'm trying to bring it back to initializing state so that it can issue another producerId request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113694079


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -155,7 +155,7 @@ private enum State {
         private boolean isTransitionValid(State source, State target) {
             switch (target) {
                 case UNINITIALIZED:
-                    return source == READY;
+                    return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   We need to manually transition the state back to initializing no? I think otherwise it get stucks in an error state here
   
   https://github.com/apache/kafka/blob/a2c9f421af40e0c8ace722be94aedf8dec4f2b31/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L998
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r876492051


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1292,8 +1292,13 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-                    error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+            } else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+                log.error("Cluster authorization failure, transition the producer state back to uninitialized");
+                fail(error.exception());
+                // transition back to uninitialized state to enable client side retry
+                transitionTo(State.READY);

Review Comment:
   Could we update `isTransitionValid` to allow transitions from INITIALIZING to UNINITIALIZED?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171764467


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   FWIW: I think in the context of this ticket, we are trying to avoid poisoning the client when the client is unable to re-authenticate upon startup. The fix wants to continue to retry the request until the permission is fixed. So the handler should handle the epoch bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171816496


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -300,9 +301,13 @@ void runOnce() {
             try {
                 transactionManager.maybeResolveSequences();
 
+                RuntimeException lastError = transactionManager.lastError();
+                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
+                    return;

Review Comment:
   Thanks Philip -- I think I forgot that this was the initProducerId call -- so we don't really have an epoch yet. 😅 We set to 0 after getting the producer ID. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org