You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/01/06 19:30:46 UTC
[kafka] branch trunk updated: MINOR: Add num threads logging upon shutdown (#11652)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9078451 MINOR: Add num threads logging upon shutdown (#11652)
9078451 is described below
commit 9078451e37a5e8bc8cd0c9865837fe5379e8f2d0
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jan 6 11:28:27 2022 -0800
MINOR: Add num threads logging upon shutdown (#11652)
1. Add num of threads logging upon shutdown.
2. Prefix the shutdown thread with client id.
Reviewers: John Roesler <vv...@apache.org>
---
.../org/apache/kafka/streams/KafkaStreams.java | 29 ++++++++++++++++++----
1 file changed, 24 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index a8f58d8..6b7e214 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1294,7 +1294,9 @@ public class KafkaStreams implements AutoCloseable {
globalStreamThread.start();
}
- processStreamThread(StreamThread::start);
+ final int numThreads = processStreamThread(StreamThread::start);
+
+ log.info("Started {} stream threads", numThreads);
final Long cleanupDelay = applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
stateDirCleaner.scheduleAtFixedRate(() -> {
@@ -1339,20 +1341,31 @@ public class KafkaStreams implements AutoCloseable {
return new Thread(() -> {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
- processStreamThread(StreamThread::shutdown);
+ int numStreamThreads = processStreamThread(StreamThread::shutdown);
+
+ log.info("Shutting down {} stream threads", numStreamThreads);
+
topologyMetadata.wakeupThreads();
- processStreamThread(thread -> {
+ numStreamThreads = processStreamThread(thread -> {
try {
if (!thread.isRunning()) {
+ log.debug("Shutdown {} complete", thread.getName());
+
thread.join();
}
} catch (final InterruptedException ex) {
+ log.warn("Shutdown {} interrupted", thread.getName());
+
Thread.currentThread().interrupt();
}
});
+ log.info("Shutdown {} stream threads complete", numStreamThreads);
+
if (globalStreamThread != null) {
+ log.info("Shutting down the global stream threads");
+
globalStreamThread.shutdown();
}
@@ -1360,9 +1373,13 @@ public class KafkaStreams implements AutoCloseable {
try {
globalStreamThread.join();
} catch (final InterruptedException e) {
+ log.warn("Shutdown the global stream thread interrupted");
+
Thread.currentThread().interrupt();
}
globalStreamThread = null;
+
+ log.info("Shutdown global stream threads complete");
}
stateDirectory.close();
@@ -1375,7 +1392,7 @@ public class KafkaStreams implements AutoCloseable {
} else {
setState(State.ERROR);
}
- }, "kafka-streams-close-thread");
+ }, clientId + "-CloseThread");
}
private boolean close(final long timeoutMs) {
@@ -1624,9 +1641,11 @@ public class KafkaStreams implements AutoCloseable {
* threads lock when looping threads.
* @param consumer handler
*/
- protected void processStreamThread(final Consumer<StreamThread> consumer) {
+ protected int processStreamThread(final Consumer<StreamThread> consumer) {
final List<StreamThread> copy = new ArrayList<>(threads);
for (final StreamThread thread : copy) consumer.accept(thread);
+
+ return copy.size();
}
/**