You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/02/26 20:22:24 UTC

[kafka] branch 2.8 updated: KAFKA-12375: fix concurrency issue in application shutdown (#10213)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 2300942  KAFKA-12375: fix concurrency issue in application shutdown (#10213)
2300942 is described below

commit 23009423f92224bff0cc37d5787ef75b925380b9
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Feb 26 12:17:28 2021 -0800

    KAFKA-12375: fix concurrency issue in application shutdown (#10213)
    
    Need to ensure that `enforceRebalance` is used in a thread safe way
    
    Reviewers: Bruno Cadonna <ca...@confluent.io>, Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../apache/kafka/streams/processor/internals/StreamThread.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b05ee4f..7943ea4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -571,6 +571,11 @@ public class StreamThread extends Thread {
         // until the rebalance is completed before we close and commit the tasks
         while (isRunning() || taskManager.isRebalanceInProgress()) {
             try {
+                if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+                    log.warn("Detected that shutdown was requested. " +
+                            "All clients in this app will now begin to shutdown");
+                    mainConsumer.enforceRebalance();
+                }
                 runOnce();
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
@@ -660,10 +665,7 @@ public class StreamThread extends Thread {
     }
 
     public void sendShutdownRequest(final AssignorError assignorError) {
-        log.warn("Detected that shutdown was requested. " +
-                "All clients in this app will now begin to shutdown");
         assignmentErrorCode.set(assignorError.code());
-        mainConsumer.enforceRebalance();
     }
 
     private void handleTaskMigrated(final TaskMigratedException e) {