You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/03/13 04:12:06 UTC

[kafka] branch 2.7 updated: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new aef800a  KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
aef800a is described below

commit aef800af84ead23eda631d7f4ad49bf269a1f830
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Mar 12 20:06:54 2021 -0800

    KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
    
    Always invoke TaskManager#handleRevocation when the thread is in PENDING_SHUTDOWN
    
    Reviewers: Walker Carlson <wc...@confluent.io>
---
 .../org/apache/kafka/streams/processor/internals/StreamThread.java    | 2 +-
 .../kafka/streams/processor/internals/StreamsRebalanceListener.java   | 4 +++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 12f1e26..1b668f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -629,7 +629,7 @@ public class StreamThread extends Thread {
         // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
         // could affect the task manager state beyond this point within #runOnce().
         if (!isRunning()) {
-            log.debug("Thread state is already {}, skipping the run once call after poll request", state);
+            log.info("Thread state is already {}, skipping the run once call after poll request", state);
             return;
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index c23eab6..b314d8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -80,7 +80,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {
             final long start = time.milliseconds();
             try {
                 taskManager.handleRevocation(partitions);