You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/03/01 21:10:13 UTC

[kafka] branch 2.6 updated: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 3371d7f  KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
3371d7f is described below

commit 3371d7f1d08493df6e11680ff357cbe9db645428
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Mar 12 20:06:54 2021 -0800

    KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
    
    Always invoke TaskManager#handleRevocation when the thread is in PENDING_SHUTDOWN
    
    Reviewers: Walker Carlson <wc...@confluent.io>
---
 .../org/apache/kafka/streams/processor/internals/StreamThread.java    | 2 +-
 .../kafka/streams/processor/internals/StreamsRebalanceListener.java   | 4 +++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d707bb5..8e04a51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -645,7 +645,7 @@ public class StreamThread extends Thread {
         // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
         // could affect the task manager state beyond this point within #runOnce().
         if (!isRunning()) {
-            log.debug("State already transits to {}, skipping the run once call after poll request", state);
+            log.debug("Thread state is already {}, skipping the run once call after poll request", state);
             return;
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index b594aa6..484bea1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -70,7 +70,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {
             final long start = time.milliseconds();
             try {
                 taskManager.handleRevocation(partitions);