You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "chia7712 (via GitHub)" <gi...@apache.org> on 2023/02/14 19:41:06 UTC

[GitHub] [kafka] chia7712 opened a new pull request, #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

chia7712 opened a new pull request, #13248:
URL: https://github.com/apache/kafka/pull/13248

   I noticed this issue when tracing #12590
   StreamThread closes the consumer before changing state to DEAD. If the partition rebalance happens quickly, the other StreamThreads can't change KafkaStream state from REBALANCING to RUNNING since there is a PENDING_SHUTDOWN StreamThread
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13248:
URL: https://github.com/apache/kafka/pull/13248#discussion_r1106408656


##########
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java:
##########
@@ -257,6 +258,23 @@ private Thread adjustCountHelperThread(final KafkaStreams kafkaStreams, final in
         });
     }
 
+    @Test
+    public void testRebalanceHappensBeforeStreamThreadGetDown() throws Exception {
+        final Properties prop = new Properties();
+        prop.putAll(properties);
+        // make rebalance happen quickly
+        prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);

Review Comment:
   Is there a better way to test this scenario than relying on the real time (and hence is time dependendent)? More specifically, I'm looking for a test case which would 100% fail without the fix, and would 100% pass with the fix. While this test seems would be pass some times even without the fix, is that right?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -658,6 +658,8 @@ public synchronized void onChange(final Thread thread,
                         setState(State.REBALANCING);
                     } else if (newState == StreamThread.State.RUNNING) {
                         maybeSetRunning();
+                    } else if (state != State.RUNNING && newState == StreamThread.State.DEAD) {

Review Comment:
   I wonder if it's cleaner to change the logic of `maybeSetRunning` directly, e.g. to exclude threads which are `PENDING_SHUTDOWN` as long as there are still other threads running?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang merged PR #13248:
URL: https://github.com/apache/kafka/pull/13248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13248:
URL: https://github.com/apache/kafka/pull/13248#issuecomment-1434895998

   LGTM. Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on a diff in pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on code in PR #13248:
URL: https://github.com/apache/kafka/pull/13248#discussion_r1107014705


##########
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java:
##########
@@ -257,6 +258,23 @@ private Thread adjustCountHelperThread(final KafkaStreams kafkaStreams, final in
         });
     }
 
+    @Test
+    public void testRebalanceHappensBeforeStreamThreadGetDown() throws Exception {
+        final Properties prop = new Properties();
+        prop.putAll(properties);
+        // make rebalance happen quickly
+        prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);

Review Comment:
   > While this test seems would be pass some times even without the fix, is that right?
   
   you are right. I have updated the test to make sure it is always failed without the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org