You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "philipnee (via GitHub)" <gi...@apache.org> on 2023/02/02 20:17:54 UTC

[GitHub] [kafka] philipnee opened a new pull request, #13190: KAFKA-12539: exit upon expired timer to prevent tight looping

philipnee opened a new pull request, #13190:
URL: https://github.com/apache/kafka/pull/13190

   In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be retried without proper backoff, due to the expired timer. This is an uncommon scenario and possibly only appears during the testing, but I think it makes sense to enforce the client to drive the join group via poll.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120456284


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
                 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0)); // failing joinGroup request will require re-poll in order to retry

Review Comment:
   Ack, that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1096112947


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -501,13 +501,16 @@ boolean joinGroupIfNeeded(final Timer timer) {
                 }
 
                 if (exception instanceof UnknownMemberIdException ||
-                    exception instanceof IllegalGenerationException ||
-                    exception instanceof RebalanceInProgressException ||
-                    exception instanceof MemberIdRequiredException)
+                        exception instanceof IllegalGenerationException ||
+                        exception instanceof RebalanceInProgressException ||
+                        exception instanceof MemberIdRequiredException)
                     continue;
                 else if (!future.isRetriable())
                     throw exception;
 
+                if (timer.isExpired()) {

Review Comment:
   Could you add a couple comments here explaining why we check the timer again here in addition to in line 452 above? Maybe something like this:
   
   ```
   We check the timer again after calling poll with the timer since it's possible that even after the timer has elapsed, the next client.poll(timer) would immediately return an error response which would cause us to not exiting the while loop.
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -501,13 +501,16 @@ boolean joinGroupIfNeeded(final Timer timer) {
                 }
 
                 if (exception instanceof UnknownMemberIdException ||
-                    exception instanceof IllegalGenerationException ||
-                    exception instanceof RebalanceInProgressException ||
-                    exception instanceof MemberIdRequiredException)
+                        exception instanceof IllegalGenerationException ||
+                        exception instanceof RebalanceInProgressException ||
+                        exception instanceof MemberIdRequiredException)

Review Comment:
   Should we actually do the timer check before this? Since otherwise if the exception from the immediately returned responses is any of those four, we would still `continue` and skip the check below.
   
   More concretely I think we can just move the remaining logic inside the `if` call:
   
   ```
   if (!future.isRetriable()) {
       throw ..
   } else {
       if (timer.isExpired()) {
           return false;
       } else if (exception instance of..) {
           continue;
       } else {
           timer.sleep(..)
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1438909145

   Although, are we ok with handling these 4 exceptions the same way as before? I know you previously mentioned that it might be better off to make the rules more consistent, and I kind of agree with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119426667


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3398,7 +3400,8 @@ public void testPrepareJoinAndRejoinAfterFailedRebalance() {
             client.respond(syncGroupResponse(partitions, Errors.NONE));
 
             // Join future should succeed but generation already cleared so result of join is false.
-            res = coordinator.joinGroupIfNeeded(time.timer(1));
+            coordinator.joinGroupIfNeeded(time.timer(0));

Review Comment:
   Similar here, the timer is expired upon IllegalGenerationException, the loop would continued in the original code, but now it would exit.  I guess we could try to poll for a bit longer, like 3ms instead of 0ms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1438907376

   Hey @guozhangwang it's WIP - I think moving the timer check before the exception handling block (that 4 exceptions), kind of breaks a bunch of tests, as most tests are expecting the complete within a single poll. I'm looking into these breakage actually. sorry about the confusion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119398074


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
                 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0)); // failing joinGroup request will require re-poll in order to retry

Review Comment:
   good point... let me update the tests there. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1435477923

   Moving the time check just broke a bunch of unit test 😅 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119357976


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // continue to retry as long as the timer hasn't expired

Review Comment:
   Could we simplify this multi-if logic as:
   
   ```
   if (!future.isRetriable()) { throw }
   else {
       if (timer.isExpired() { return false }
       else if (exception instance of.. ) { continue}
       else {timer.sleep(..)}
   }
   ```
   
   Also could we add a comment on top clarifying that the order of precedence are deliberated in this order and future changes should pay attention to not change it unnecessarily.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
                 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0)); // failing joinGroup request will require re-poll in order to retry

Review Comment:
   It's not very clear to me why here and line 3403 below we need additional polls since the test scenarios seems irrelevant to error cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1447347885

   Here are a couple things I updated:
   1. Added some documentation to clarify the intent, but I didn't rewrite it as nested if "can be" harder to read.
   2.  Added non zero timeouts for the tests as our timer now is stricter and will explicitly exit upon expiration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449191428

   yeah the DefaultStateUpdaterTest has been failing from time to time... not sure why 😭 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449185108

   Hmm. I think these tests are flaky actually
   ```
   Build / JDK 17 and Scala 2.13 / shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   30s
   Build / JDK 17 and Scala 2.13 / shouldPauseActiveTaskAndTransitToUpdateStandby() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   30s
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 11 and Scala 2.13 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest
   19s
   Build / JDK 11 and Scala 2.13 / shouldRemovePausedAndUpdatingTasksOnShutdown() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   30s
   Build / JDK 11 and Scala 2.13 / shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   31s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1414556430

   @guozhangwang - would you have time to review this 🥺 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1103108812


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -501,13 +501,16 @@ boolean joinGroupIfNeeded(final Timer timer) {
                 }
 
                 if (exception instanceof UnknownMemberIdException ||
-                    exception instanceof IllegalGenerationException ||
-                    exception instanceof RebalanceInProgressException ||
-                    exception instanceof MemberIdRequiredException)
+                        exception instanceof IllegalGenerationException ||
+                        exception instanceof RebalanceInProgressException ||
+                        exception instanceof MemberIdRequiredException)

Review Comment:
   agreed. A comment here, retriableException check should happen after the `instanceOf` checks, because I think we actually want to retry upon these (according to the logic).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119399381


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // continue to retry as long as the timer hasn't expired

Review Comment:
   so the if else blocks becomes a bit fragmented.  or we could do (imo, a bit more difficult to read):
   ```
   if (!future.isRetriable()) {
     if ( ... instance of ... ) { continue; }
     throw ...
   }
   
   {rest of the logic there}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120460193


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // 4 special non-retriable exceptions that we want to retry, as long as the timer hasn't expired.

Review Comment:
   I think here the comment is not to just state what the code did, since readers can just understand that from the code :P instead what we want to emphasize is to remind future contributors that they should be careful to not change the precedence ordering of this logic unnecessarily.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // 4 special non-retriable exceptions that we want to retry, as long as the timer hasn't expired.
                 if (exception instanceof UnknownMemberIdException ||
-                    exception instanceof IllegalGenerationException ||
-                    exception instanceof RebalanceInProgressException ||
-                    exception instanceof MemberIdRequiredException)
+                        exception instanceof IllegalGenerationException ||

Review Comment:
   Ah thanks for the clarifications!
   
   Thinking about this a bit more (sorry for getting back and forth..), I now concerned a bit more that for some usage patterns where `poll` call would be triggered less frequently, we may not be coming back to handle these four exceptions while at the same time the broker is ticking and waiting for the join-group request to be re-sent. Hence I'm changing my mind to lean a bit more to honor the exception types for immediate handling than the timeouts --- again, sorry for going back and forth...
   
   So I think we would define the ordering as the following:
   
   1. For un-retriable exception, always try to handle immediately and not honor the timer.
   2. Otherwise, honor the timer.
   
   In that case, we could just go back to the first time you made the change, i.e. just add the 
   
   ```
   if (timer.isExpired())
                           return false;
   ```
   
   After the `if/else-if` block. Still it's better to comment that above ordering is diligently designed as such.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1438904177

   @philipnee is this the final version of this PR? Seems we are still honoring the four exceptions indicating the mid-stage of a rebalance more than the elapsed timer here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1444268607

   Just a bit a note here on this PR: Seems like we need to be more deliberate at handling the timeout, because the non-retriable errors are always expected to be thrown. (except for the 4 cases), which is why the change triggered 60-ish breaking tests. Updating the PR to retrigger the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1427307821

   Thanks @guozhangwang for the feedback - Added some tests there to cover the untesed cases.  I still have a quick question around this block, is it intentional to continue w/o sleep on the backoff timer? (quoting the original code)
   
   ```
   if (exception instanceof UnknownMemberIdException ||
                       exception instanceof IllegalGenerationException ||
                       exception instanceof RebalanceInProgressException ||
                       exception instanceof MemberIdRequiredException)
                       continue;
                   else if (!future.isRetriable())
                       throw exception;
   
                   timer.sleep(rebalanceConfig.retryBackoffMs);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1440514286

   Hmm, strangely, this branch seems to trigger a bunch of initializing error failures. And I can't seem to reproduce them locally...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120481527


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // 4 special non-retriable exceptions that we want to retry, as long as the timer hasn't expired.
                 if (exception instanceof UnknownMemberIdException ||
-                    exception instanceof IllegalGenerationException ||
-                    exception instanceof RebalanceInProgressException ||
-                    exception instanceof MemberIdRequiredException)
+                        exception instanceof IllegalGenerationException ||

Review Comment:
   Hey thanks for the comments again and absolutely no apology is needed there! I guess, as we all know, rebalancing is full of subtleties, so it makes sense to be careful about these non-retriable exception case. I think it's a good idea to keep the original behavior consistent, in case of unexpected breakage. Updating the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120665269


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -501,13 +501,18 @@ boolean joinGroupIfNeeded(final Timer timer) {
                 }
 
                 if (exception instanceof UnknownMemberIdException ||
-                    exception instanceof IllegalGenerationException ||
-                    exception instanceof RebalanceInProgressException ||
-                    exception instanceof MemberIdRequiredException)
+                        exception instanceof IllegalGenerationException ||
+                        exception instanceof RebalanceInProgressException ||
+                        exception instanceof MemberIdRequiredException)
                     continue;
                 else if (!future.isRetriable())

Review Comment:
   the previous logic was reverted with some autocorrection to the indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang merged PR #13190:
URL: https://github.com/apache/kafka/pull/13190


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120663942


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1484,7 +1484,8 @@ public void testRebalanceWithMetadataChange() {
                 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.poll(time.timer(0));
+        assertFalse(client.hasInFlightRequests());
+        coordinator.poll(time.timer(1));

Review Comment:
   note: we need to add a timeout here to give the retry a second chance, because in the new code, the timer is checked and causes the method to exit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1433949296

   > is it intentional to continue w/o sleep on the backoff timer?
   
   Yes that's intentional. For those four exceptions, we'd like to send the follow-up request right away since the broker is waiting for those join-group request. But the question is, when the timer has already elapsed, should we honor that or should we ignore but always try to complete this mid-stage.
   
   Since in the new protocol we would no longer have such mid-stages during a prepare_rebalance phase (cc @dajac to chime in if you feel different), I would suggest we respect the timer still for now to have a stronger `poll(timer)` timing guarantees.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1438908779

   Oh got it, thanks! All good :) Please let me know when it's ready for a final look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1435229905

   Thanks, @guozhangwang, that's my understanding as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119422842


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
                 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0)); // failing joinGroup request will require re-poll in order to retry

Review Comment:
   The NOT_COORDINATOR error originally should trigger retries; however, in the new code, it would exit due to an expired timer. Another way to do it is using poll(time.timer(1))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119397792


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // continue to retry as long as the timer hasn't expired

Review Comment:
   I think the `instanceof ...` exceptions are also non-retriable, and I think they need to be handled first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1445261315

   The failures seem irrelevant to the change here: i.e. they dont' show up in both rounds.
   ```
   Build / JDK 11 and Scala 2.13 / testDynamicListenerConnectionCreationRateQuota() – kafka.network.DynamicConnectionQuotaTest
   41s
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.server.KafkaServerKRaftRegistrationTest
   7s
   Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.server.KafkaServerKRaftRegistrationTest
   12s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1440489302

   Yeah I think it's okay to make the rule consistent, i.e. to honor the timeout even under those four exceptions: if the timer has elapsed, then we should well return from the loop in
   
   ```
   client.poll(future, timer);
               if (!future.isDone()) {
                   // we ran out of time
                   return false;
               }
   ```
   
   even if the response yet to be returned would contain any of these four exceptions. So I think we should still obey this rule, i.e. even if a response has been returned and we know it's going to be one of these four exceptions, if the timer has elapsed, we still exit the loop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449185984

   The test failures are not relevant (but some of them are related to DefaultStateUpdaterTest.. sigh).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449186137

   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119398074


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
                 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0)); // failing joinGroup request will require re-poll in order to retry

Review Comment:
   good point... let me update the tests there. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119399381


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // continue to retry as long as the timer hasn't expired

Review Comment:
   so the if else blocks becomes a bit fragmented.  or we could do:
   ```
   if (!future.isRetriable()) {
     if ( ... instance of ... ) { continue; }
     throw ...
   }
   
   {rest of the logic there}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119428911


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     requestRejoin(shortReason, fullReason);
                 }
 
+                // continue to retry as long as the timer hasn't expired

Review Comment:
   However, this is a bit more nested, which can be harder to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org