You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/02/26 20:22:24 UTC
[kafka] branch 2.8 updated: KAFKA-12375: fix concurrency issue in
application shutdown (#10213)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 2300942 KAFKA-12375: fix concurrency issue in application shutdown (#10213)
2300942 is described below
commit 23009423f92224bff0cc37d5787ef75b925380b9
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Feb 26 12:17:28 2021 -0800
KAFKA-12375: fix concurrency issue in application shutdown (#10213)
Need to ensure that `enforceRebalance` is used in a thread safe way
Reviewers: Bruno Cadonna <ca...@confluent.io>, Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../apache/kafka/streams/processor/internals/StreamThread.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b05ee4f..7943ea4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -571,6 +571,11 @@ public class StreamThread extends Thread {
// until the rebalance is completed before we close and commit the tasks
while (isRunning() || taskManager.isRebalanceInProgress()) {
try {
+ if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+ log.warn("Detected that shutdown was requested. " +
+ "All clients in this app will now begin to shutdown");
+ mainConsumer.enforceRebalance();
+ }
runOnce();
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
@@ -660,10 +665,7 @@ public class StreamThread extends Thread {
}
public void sendShutdownRequest(final AssignorError assignorError) {
- log.warn("Detected that shutdown was requested. " +
- "All clients in this app will now begin to shutdown");
assignmentErrorCode.set(assignorError.code());
- mainConsumer.enforceRebalance();
}
private void handleTaskMigrated(final TaskMigratedException e) {