You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/13 00:00:56 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

ableegoldman opened a new pull request #10311:
URL: https://github.com/apache/kafka/pull/10311


   Also check the state after the poll phase and exit the StreamThread processing loop early if the thread is in PENDING_SHUTDOWN


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797829955


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593522324



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -714,6 +714,13 @@ void runOnce() {
 
         final long pollLatency = pollPhase();
 
+        // Optimization to skip the rest of the processing loop in case the thread was requested to shut down during
+        // the poll phase

Review comment:
       🤦‍♀️  Oh wow how did I not see that lol. I'll just bump the log to INFO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10311:
URL: https://github.com/apache/kafka/pull/10311


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-1055859746


   > Can we cherry pick with back to 2.6 as well?
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797862761


   Merged to trunk and cherrypicked to 2.7 & 2.8 cc @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593523423



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
       I think this is the correct order (assuming you mean the order of `streamThread.setState(State.PARTITIONS_REVOKED) != null` relative to `streamThread.state() == State.PENDING_SHUTDOWN`?) -- if the thread is not in PENDING_SHUTDOWN when it reaches this line, the first condition should return true, which is what we want even if it does get transitioned to PENDING_SHUTDOWN immediately after the transition to PARTITIONS_REVOKED.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-1055820659


   Can we cherry pick with back to 2.6 as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797862288


   One unrelated test failure: `kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()`
   
   Going to merge


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593517811



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
       do we need to be concerned about the oder these execute?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -714,6 +714,13 @@ void runOnce() {
 
         final long pollLatency = pollPhase();
 
+        // Optimization to skip the rest of the processing loop in case the thread was requested to shut down during
+        // the poll phase

Review comment:
       Good idea but I think we do this a few lines down




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797820964


   @vvcephei @wcarlson5 @lct45 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org