You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/17 15:50:58 UTC

[GitHub] [kafka] vvcephei commented on a diff in pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

vvcephei commented on code in PR #12869:
URL: https://github.com/apache/kafka/pull/12869#discussion_r1025368358


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -298,6 +298,7 @@ public boolean isRunning() {
     private volatile ThreadMetadata threadMetadata;
     private StreamThread.StateListener stateListener;
     private final Optional<String> getGroupInstanceID;
+    private final String threadIdSuffix; // shortened version of the threadId: {processUUID}-StreamThread-{threadIdx}

Review Comment:
   This isn't exactly a suffix. If users supply their own client id, we just use it directly:
   ```
           final String userClientId = applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG);
           final String applicationId = applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
           if (userClientId.length() <= 0) {
               clientId = applicationId + "-" + processId;
           } else {
               clientId = userClientId;
           }
   ```
   
   It seems like what you're really after is either a thread id that's independent of the application id (in which case, why?) or a thread id that's independent of the user-configured client id (also, why?)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -611,9 +616,11 @@ boolean runLoop() {
                     cacheResizer.accept(size);
                 }
                 runOnce();
-                if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+                // Check for a scheduled rebalance but don't trigger it until the current rebalance is done
+                if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   I'm sure you thought of it, but just to make sure, is there any circumstance where the "nextProbingRebalance" could get cleared by the time we reach "rebalanceInProgress == false"?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -611,9 +616,11 @@ boolean runLoop() {
                     cacheResizer.accept(size);
                 }
                 runOnce();
-                if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+                // Check for a scheduled rebalance but don't trigger it until the current rebalance is done
+                if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   It seems like this alone is the fix we needed, right?
   
   Maybe a nitpick, but I'm wondering if we really needed the other refactoring/renaming to be part of the same PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org