You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/03/13 04:12:06 UTC
[kafka] branch 2.7 updated: KAFKA-12462: proceed with task
revocation in case of thread in PENDING_SHUTDOWN (#10311)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new aef800a KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
aef800a is described below
commit aef800af84ead23eda631d7f4ad49bf269a1f830
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Mar 12 20:06:54 2021 -0800
KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311)
Always invoke TaskManager#handleRevocation when the thread is in PENDING_SHUTDOWN
Reviewers: Walker Carlson <wc...@confluent.io>
---
.../org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
.../kafka/streams/processor/internals/StreamsRebalanceListener.java | 4 +++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 12f1e26..1b668f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -629,7 +629,7 @@ public class StreamThread extends Thread {
// Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
// could affect the task manager state beyond this point within #runOnce().
if (!isRunning()) {
- log.debug("Thread state is already {}, skipping the run once call after poll request", state);
+ log.info("Thread state is already {}, skipping the run once call after poll request", state);
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index c23eab6..b314d8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -80,7 +80,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
taskManager.activeTaskIds(),
taskManager.standbyTaskIds());
- if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) {
+ // We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever
+ // transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD)
+ if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {
final long start = time.milliseconds();
try {
taskManager.handleRevocation(partitions);