You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/03/01 21:10:13 UTC
[kafka] branch 2.6 updated: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 3371d7f KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
3371d7f is described below
commit 3371d7f1d08493df6e11680ff357cbe9db645428
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Mar 12 20:06:54 2021 -0800
KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
Always invoke TaskManager#handleRevocation when the thread is in PENDING_SHUTDOWN
Reviewers: Walker Carlson <wc...@confluent.io>
---
.../org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
.../kafka/streams/processor/internals/StreamsRebalanceListener.java | 4 +++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d707bb5..8e04a51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -645,7 +645,7 @@ public class StreamThread extends Thread {
// Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
// could affect the task manager state beyond this point within #runOnce().
if (!isRunning()) {
- log.debug("State already transits to {}, skipping the run once call after poll request", state);
+ log.debug("Thread state is already {}, skipping the run once call after poll request", state);
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index b594aa6..484bea1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -70,7 +70,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
taskManager.activeTaskIds(),
taskManager.standbyTaskIds());
- if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) {
+ // We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever
+ // transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD)
+ if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {
final long start = time.milliseconds();
try {
taskManager.handleRevocation(partitions);