You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/02/14 21:42:35 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

guozhangwang commented on code in PR #13248:
URL: https://github.com/apache/kafka/pull/13248#discussion_r1106408656


##########
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java:
##########
@@ -257,6 +258,23 @@ private Thread adjustCountHelperThread(final KafkaStreams kafkaStreams, final in
         });
     }
 
+    @Test
+    public void testRebalanceHappensBeforeStreamThreadGetDown() throws Exception {
+        final Properties prop = new Properties();
+        prop.putAll(properties);
+        // make rebalance happen quickly
+        prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);

Review Comment:
   Is there a better way to test this scenario than relying on the real time (and hence is time dependendent)? More specifically, I'm looking for a test case which would 100% fail without the fix, and would 100% pass with the fix. While this test seems would be pass some times even without the fix, is that right?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -658,6 +658,8 @@ public synchronized void onChange(final Thread thread,
                         setState(State.REBALANCING);
                     } else if (newState == StreamThread.State.RUNNING) {
                         maybeSetRunning();
+                    } else if (state != State.RUNNING && newState == StreamThread.State.DEAD) {

Review Comment:
   I wonder if it's cleaner to change the logic of `maybeSetRunning` directly, e.g. to exclude threads which are `PENDING_SHUTDOWN` as long as there are still other threads running?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org