You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/13 00:05:32 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

wcarlson5 commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593517811



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
       do we need to be concerned about the oder these execute?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -714,6 +714,13 @@ void runOnce() {
 
         final long pollLatency = pollPhase();
 
+        // Optimization to skip the rest of the processing loop in case the thread was requested to shut down during
+        // the poll phase

Review comment:
       Good idea but I think we do this a few lines down




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org