You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/17 03:22:53 UTC

[GitHub] [kafka] ableegoldman opened a new pull request, #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

ableegoldman opened a new pull request, #12869:
URL: https://github.com/apache/kafka/pull/12869

   Fix for the subtle bug described in KAFKA-14382 that was causing rebalancing loops. If we trigger a new rebalance while the current one is still ongoing, it may cause some members to fail the first rebalance if they weren't able to send the SyncGroup request in time (for example due to processing records during the rebalance). This means those consumers never receive their assignment from the original rebalance, and won't revoke any partitions they might have needed to. This can send the group into a loop as each rebalance schedules a new followup cooperative rebalance due to partitions that need to be revoked, and each followup rebalance causes some consumer(s) to miss the SyncGroup and never revoke those partitions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on a diff in pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on code in PR #12869:
URL: https://github.com/apache/kafka/pull/12869#discussion_r1025368358


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -298,6 +298,7 @@ public boolean isRunning() {
     private volatile ThreadMetadata threadMetadata;
     private StreamThread.StateListener stateListener;
     private final Optional<String> getGroupInstanceID;
+    private final String threadIdSuffix; // shortened version of the threadId: {processUUID}-StreamThread-{threadIdx}

Review Comment:
   This isn't exactly a suffix. If users supply their own client id, we just use it directly:
   ```
           final String userClientId = applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG);
           final String applicationId = applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
           if (userClientId.length() <= 0) {
               clientId = applicationId + "-" + processId;
           } else {
               clientId = userClientId;
           }
   ```
   
   It seems like what you're really after is either a thread id that's independent of the application id (in which case, why?) or a thread id that's independent of the user-configured client id (also, why?)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -611,9 +616,11 @@ boolean runLoop() {
                     cacheResizer.accept(size);
                 }
                 runOnce();
-                if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+                // Check for a scheduled rebalance but don't trigger it until the current rebalance is done
+                if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   I'm sure you thought of it, but just to make sure, is there any circumstance where the "nextProbingRebalance" could get cleared by the time we reach "rebalanceInProgress == false"?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -611,9 +616,11 @@ boolean runLoop() {
                     cacheResizer.accept(size);
                 }
                 runOnce();
-                if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+                // Check for a scheduled rebalance but don't trigger it until the current rebalance is done
+                if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   It seems like this alone is the fix we needed, right?
   
   Maybe a nitpick, but I'm wondering if we really needed the other refactoring/renaming to be part of the same PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on a diff in pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on code in PR #12869:
URL: https://github.com/apache/kafka/pull/12869#discussion_r1025891177


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -611,9 +616,11 @@ boolean runLoop() {
                     cacheResizer.accept(size);
                 }
                 runOnce();
-                if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+                // Check for a scheduled rebalance but don't trigger it until the current rebalance is done
+                if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   Yes, but in those cases we should actually not have triggered the rebalance anyway -- this should always reflect the latest status of whether a rebalance is actually needed.
   
   And yeah, I'll remove all the thread id stuff but since  there's not much other junk left once that's removed, I would rather keep the minor renaming of `isRebalanceInProgress` if that's ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on PR #12869:
URL: https://github.com/apache/kafka/pull/12869#issuecomment-1320825312

   Cherrypicked all the way back to 3.0...whew!
   
   Had to resolve a few minor merge conflicts along the way but it was generally smooth (I did make a small mistake on the 3.3 resolution so I pushed a quick hotfix to re-add a line I had accidentally removed)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on a diff in pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on code in PR #12869:
URL: https://github.com/apache/kafka/pull/12869#discussion_r1025887660


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -298,6 +298,7 @@ public boolean isRunning() {
     private volatile ThreadMetadata threadMetadata;
     private StreamThread.StateListener stateListener;
     private final Optional<String> getGroupInstanceID;
+    private final String threadIdSuffix; // shortened version of the threadId: {processUUID}-StreamThread-{threadIdx}

Review Comment:
   Ah shoot, good point :/ This was my attempt at a compromise between staying relatively short, and containing enough info to actually be useful. Maybe I'm being overly paranoid, but we have twice had to revert the commit which added the `reason` message to `#enforceRebalance` due to perf issues related to strings/string length -- and I've seen some VERY long application/client ids.
   
   I'm not sure how long is too long, but if you look into the underlying consumer/coordinator methods, everywhere else we pass in a "full reason" for logging plus a "short reason" for actually embedding in the request, due to said perf regression with longer strings. 
   
   But maybe we don't actually need the client id here after all? I'm not sure how this is ultimately used but I would hope the broker would log the client id for a given `reason` string in the JoinGroup request..I'm just going to remove this part for now and if we do find it would be useful, I can do a separate PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on PR #12869:
URL: https://github.com/apache/kafka/pull/12869#issuecomment-1320817093

   Test failures are all unrelated -- merging this now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman merged pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

Posted by GitBox <gi...@apache.org>.
ableegoldman merged PR #12869:
URL: https://github.com/apache/kafka/pull/12869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org