You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/06 10:21:30 UTC

[GitHub] [kafka] rajinisivaram opened a new pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

rajinisivaram opened a new pull request #8986:
URL: https://github.com/apache/kafka/pull/8986


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r518938665



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -460,15 +462,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     exception instanceof IllegalGenerationException ||
                     exception instanceof MemberIdRequiredException)
                     continue;
-                else if (!future.isRetriable())
-                    throw exception;
-
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                else {
+                    handleFailure(future, timer);
+                }
             }
         }
         return true;
     }
 
+    protected void handleFailure(RequestFuture<?> future, Timer timer) {
+        if (future.isRetriable() || future.exception() instanceof GroupAuthorizationException)

Review comment:
       @abbccdda Thanks for the review, sorry about the delay in following this up. The main purpose of this PR is to add backoff before propagating GroupAuthorizationException so that a client that polls in a loop doesn't end up in a tight poll loop if user doesn't have access to the group. Added comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r454381244



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     exception instanceof IllegalGenerationException ||
                     exception instanceof MemberIdRequiredException)
                     continue;
-                else if (!future.isRetriable())
-                    throw exception;
-
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                else {
+                    handleFailure(future, timer);
+                }
             }
         }
         return true;
     }
 
+    protected void handleFailure(RequestFuture<?> future, Timer timer) {
+        if (future.isRetriable() || future.exception() instanceof AuthorizationException)

Review comment:
       @abbccdda Thinking about this a bit more, it seems safer to restrict backoff to GroupAuthorizationException as you had mentioned earlier in this thread. Consumers requests with topics may contain multiple topics of which only a subset are unauthorized. We probably don't want to apply backoff in that case. Since the issue we have seen is only with group authorization, I have updated the PR to only apply backoff for GroupAuthorizationException (in addition to retriable exceptions which we were handling earlier). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r451498688



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     exception instanceof IllegalGenerationException ||
                     exception instanceof MemberIdRequiredException)
                     continue;
-                else if (!future.isRetriable())
-                    throw exception;
-
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                else {
+                    handleFailure(future, timer);
+                }
             }
         }
         return true;
     }
 
+    protected void handleFailure(RequestFuture<?> future, Timer timer) {
+        if (future.isRetriable() || future.exception() instanceof AuthorizationException)

Review comment:
       @abbccdda Thanks for the review. The instance we saw was an authorization failure for groups, but since I was fixing handling of all responses, I used `AuthorizationException` in the check since we want to ensure we backoff for topic authorization exceptions too. We already backoff for `RetriableException` and I haven't changed that behaviour, this PR addresses exceptions that are not subclasses of `RetriableException`. Perhaps just updating `AuthorizationException` as is currently done in the PR is sufficient?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r452997443



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     exception instanceof IllegalGenerationException ||
                     exception instanceof MemberIdRequiredException)
                     continue;
-                else if (!future.isRetriable())
-                    throw exception;
-
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                else {
+                    handleFailure(future, timer);
+                }
             }
         }
         return true;
     }
 
+    protected void handleFailure(RequestFuture<?> future, Timer timer) {
+        if (future.isRetriable() || future.exception() instanceof AuthorizationException)

Review comment:
       Yes, I think the current PR scope is fine for just fixing consumer issue. We should just be careful not to accidentally make retries for other components relying on subclass of `AuthorizationException`. Could we reorder the check though, to make throw behavior happen first?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -259,6 +260,22 @@ public void testCoordinatorDiscoveryBackoff() {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testGroupAuthorizationFailure() {

Review comment:
       Could we also test topic authorization failure?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r518937545



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -259,6 +260,22 @@ public void testCoordinatorDiscoveryBackoff() {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testGroupAuthorizationFailure() {
+        setupCoordinator();
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED));
+
+        long initialTimeMs = mockTime.milliseconds();
+        assertThrows(GroupAuthorizationException.class, () ->
+                coordinator.ensureCoordinatorReady(mockTime.timer(Long.MAX_VALUE)));
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(mockTime.timer(Long.MAX_VALUE));
+        long timeTakenMs = mockTime.milliseconds() - initialTimeMs;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r454331091



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     exception instanceof IllegalGenerationException ||
                     exception instanceof MemberIdRequiredException)
                     continue;
-                else if (!future.isRetriable())
-                    throw exception;
-
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                else {
+                    handleFailure(future, timer);
+                }
             }
         }
         return true;
     }
 
+    protected void handleFailure(RequestFuture<?> future, Timer timer) {
+        if (future.isRetriable() || future.exception() instanceof AuthorizationException)

Review comment:
       `AuthorizationException` is not retriable and I kept that it that way, so this PR won't affect other components. Since AuthorizationException is not retriable, we want to sleep first before throwing the exception, hence the ordering of sleep before throw.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r450129557



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     exception instanceof IllegalGenerationException ||
                     exception instanceof MemberIdRequiredException)
                     continue;
-                else if (!future.isRetriable())
-                    throw exception;
-
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                else {
+                    handleFailure(future, timer);
+                }
             }
         }
         return true;
     }
 
+    protected void handleFailure(RequestFuture<?> future, Timer timer) {
+        if (future.isRetriable() || future.exception() instanceof AuthorizationException)

Review comment:
       @hachikuji AuthorizationException is not marked as a retriable exception, but I think applications would typically retry since it could be transient while ACLs have not yet been propagated. Not sure whether we should do one or both of these:
   1) Make AuthorizationException retriable since it could be transient, seemed riskier than just checking here.
   2) Add backoff for all exceptions, not just AuthorizationException and other retriable exceptions to avoid tight loops. Wasn't sure what other fatal exceptions may be thrown when looking up coordinator.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r451022144



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     exception instanceof IllegalGenerationException ||
                     exception instanceof MemberIdRequiredException)
                     continue;
-                else if (!future.isRetriable())
-                    throw exception;
-
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                else {
+                    handleFailure(future, timer);
+                }
             }
         }
         return true;
     }
 
+    protected void handleFailure(RequestFuture<?> future, Timer timer) {
+        if (future.isRetriable() || future.exception() instanceof AuthorizationException)

Review comment:
       GroupAuthorizationException is the only retriable exception defined in the ticket, maybe we should just scope the fix only to that. Agree with No.2 to backoff for all retriable exceptions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org