You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/01/06 19:30:46 UTC

[kafka] branch trunk updated: MINOR: Add num threads logging upon shutdown (#11652)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9078451  MINOR: Add num threads logging upon shutdown (#11652)
9078451 is described below

commit 9078451e37a5e8bc8cd0c9865837fe5379e8f2d0
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jan 6 11:28:27 2022 -0800

    MINOR: Add num threads logging upon shutdown (#11652)
    
    1. Add num of threads logging upon shutdown.
    2. Prefix the shutdown thread with client id.
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 29 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index a8f58d8..6b7e214 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1294,7 +1294,9 @@ public class KafkaStreams implements AutoCloseable {
                 globalStreamThread.start();
             }
 
-            processStreamThread(StreamThread::start);
+            final int numThreads = processStreamThread(StreamThread::start);
+
+            log.info("Started {} stream threads", numThreads);
 
             final Long cleanupDelay = applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
             stateDirCleaner.scheduleAtFixedRate(() -> {
@@ -1339,20 +1341,31 @@ public class KafkaStreams implements AutoCloseable {
         return new Thread(() -> {
             // notify all the threads to stop; avoid deadlocks by stopping any
             // further state reports from the thread since we're shutting down
-            processStreamThread(StreamThread::shutdown);
+            int numStreamThreads = processStreamThread(StreamThread::shutdown);
+
+            log.info("Shutting down {} stream threads", numStreamThreads);
+
             topologyMetadata.wakeupThreads();
 
-            processStreamThread(thread -> {
+            numStreamThreads = processStreamThread(thread -> {
                 try {
                     if (!thread.isRunning()) {
+                        log.debug("Shutdown {} complete", thread.getName());
+
                         thread.join();
                     }
                 } catch (final InterruptedException ex) {
+                    log.warn("Shutdown {} interrupted", thread.getName());
+
                     Thread.currentThread().interrupt();
                 }
             });
 
+            log.info("Shutdown {} stream threads complete", numStreamThreads);
+
             if (globalStreamThread != null) {
+                log.info("Shutting down the global stream threads");
+
                 globalStreamThread.shutdown();
             }
 
@@ -1360,9 +1373,13 @@ public class KafkaStreams implements AutoCloseable {
                 try {
                     globalStreamThread.join();
                 } catch (final InterruptedException e) {
+                    log.warn("Shutdown the global stream thread interrupted");
+
                     Thread.currentThread().interrupt();
                 }
                 globalStreamThread = null;
+
+                log.info("Shutdown global stream threads complete");
             }
 
             stateDirectory.close();
@@ -1375,7 +1392,7 @@ public class KafkaStreams implements AutoCloseable {
             } else {
                 setState(State.ERROR);
             }
-        }, "kafka-streams-close-thread");
+        }, clientId + "-CloseThread");
     }
 
     private boolean close(final long timeoutMs) {
@@ -1624,9 +1641,11 @@ public class KafkaStreams implements AutoCloseable {
      * threads lock when looping threads.
      * @param consumer handler
      */
-    protected void processStreamThread(final Consumer<StreamThread> consumer) {
+    protected int processStreamThread(final Consumer<StreamThread> consumer) {
         final List<StreamThread> copy = new ArrayList<>(threads);
         for (final StreamThread thread : copy) consumer.accept(thread);
+
+        return copy.size();
     }
 
     /**