You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/01 06:47:11 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

guozhangwang opened a new pull request #10232:
URL: https://github.com/apache/kafka/pull/10232


   1. Create a reason string to be used for INFO log entry whenever we request re-join or reset generation state.
   2. Some minor cleanups.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#issuecomment-791891198


   We may reset generationa and request rejoin in two different places: 1) in join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is received. The principle is that these two should not overlap, and 2) is used as a fallback for those common errors from join/sync that we do not handle specifically.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593510070



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       Ok cool, thanks. One last question then: after this refactoring, since we no longer call `requestRejoinOnResponseError` below, should we re-add the `requestRejoin()` call here? Or add a `requestRejoin` to the specific cases in the SyncGroup handler, eg
   ```
   } else if (error == Errors.REBALANCE_IN_PROGRESS) {
       log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                         "Sent generation was {}", sentGeneration);
       future.raise(error);
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#issuecomment-797844734


   Failed with unrelated `connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining()` and `kafka.server.ScramServerStartupTest.testAuthentications()`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593463366



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       @guozhangwang I think something may have been messed up during a merge/rebase: I no longer see `requestRejoinOnResponseError` being invoked anywhere




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r588842618



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       I meant the latter: we call that inside the conditions already -- for those fatal errors, we do not need to call this anyways since the consumer will throw and crash.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r585070038



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       Just to clarify, you mean we don't need to rejoin here since we will always raise an error, and always rejoin (if necessary) when checking that error?
   
   Or are you referring to the `requestRejoinOnResponseError` calls you added to the two last cases in the below if/else? 

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
         return generation.memberId;
     }
 
-    private synchronized void resetState() {
+    private synchronized void resetState(final String reason) {

Review comment:
       nit: rename to `resetStateAndGeneration`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse,
                     log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}",
                              error.message(), sentGeneration);
                     markCoordinatorUnknown(error);
+                    requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error);

Review comment:
       Why do we explicitly rejoin in this case, but not eg `REBALANCE_IN_PROGRESS`? or `UNKNOWN_MEMBER_ID`/`ILLEGAL_GENERATION` ?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
         return generation.memberId;
     }
 
-    private synchronized void resetState() {
+    private synchronized void resetState(final String reason) {
+        log.info("Resetting generation due to: {}", reason);
+
         state = MemberState.UNJOINED;
         generation = Generation.NO_GENERATION;
     }
 
-    private synchronized void resetStateAndRejoin() {
-        resetState();
-        rejoinNeeded = true;
+    private synchronized void resetStateAndRejoin(final String reason) {
+        resetState(reason);
+        requestRejoin(reason);
     }
 
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
-        log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);

Review comment:
       SGTM. If we find it flooding the logs and not helpful we can reconsider




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593515777



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       I think we do not need to, since it would be called on `resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));` --- previously we are calling rejoin double times.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r584479103



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
         return generation.memberId;
     }
 
-    private synchronized void resetState() {
+    private synchronized void resetState(final String reason) {
+        log.info("Resetting generation due to: {}", reason);
+
         state = MemberState.UNJOINED;
         generation = Generation.NO_GENERATION;
     }
 
-    private synchronized void resetStateAndRejoin() {
-        resetState();
-        rejoinNeeded = true;
+    private synchronized void resetStateAndRejoin(final String reason) {
+        resetState(reason);
+        requestRejoin(reason);
     }
 
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
-        log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);

Review comment:
       Note that I intentionally bumped up the log level from debug to info here since I think this is necessarily a message that users should pay attention to in production, where they mostly use INFO. Open for counter suggestions though.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       We can remove this since it is a bit redundant now as we call for each case if necessary.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -860,7 +861,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
 
         @Override
         public void onFailure(RuntimeException e, RequestFuture<Void> future) {
-            log.debug("FindCoordinator request failed due to {}", e);
+            log.debug("FindCoordinator request failed due to {}", e.toString());

Review comment:
       Minor cleanup, we only need to print the error message but not the stack trace.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r588847540



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse,
                     log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}",
                              error.message(), sentGeneration);
                     markCoordinatorUnknown(error);
+                    requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error);

Review comment:
       You're right, we do not, I've updated this section.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593521930



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       Hmm...but `resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));` is only called in `joinGroupIfNeeded`  which is only called in `ensureActiveGroup`, which is in turn only invoked in `ConsumerCoordinator#poll`.
   That said,  inside `SyncGroupResponseHandler#handle` we would already have `rejoinNeeded = true` and only set it to false if the SyncGroup succeeds. So for that reason I guess we don't need the `requestRejoin` anywhere inside the SyncGroup handler




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593504141



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
                     }
                 }
             } else {
-                requestRejoin();

Review comment:
       I added that function for sync group handler that handles retriable `COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR` and any unexpected error. After the refactoring PR they are not all fall into the `joinGroupIfNeeded` in
   
   ```
   final RuntimeException exception = future.exception();
   
                   resetJoinGroupFuture();
   
                   if (exception instanceof UnknownMemberIdException ||
                       exception instanceof IllegalGenerationException ||
                       exception instanceof RebalanceInProgressException ||
                       exception instanceof MemberIdRequiredException)
                       continue;
                   else if (!future.isRetriable())
                       throw exception;
   
                   resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));
                   timer.sleep(rebalanceConfig.retryBackoffMs);
   ```
   
   This is part of the principle I mentioned:
   
   ```
   We may reset generationa and request rejoin in two different places: 1) in join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is received. The principle is that these two should not overlap, and 2) is used as a fallback for those common errors from join/sync that we do not handle specifically.
   ```
   
   But I forgot to remove this function as part of the second pass; will remove.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10232:
URL: https://github.com/apache/kafka/pull/10232


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org