You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/18 20:11:34 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #10355: KAFKA-12500: fix memory leak in thread cache

wcarlson5 commented on a change in pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#discussion_r597208785



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -971,6 +972,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
             synchronized (changeThreadCount) {
                 threadIdx = getNextThreadIndex();
                 cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads() + 1);
+                log.info("Adding a new StreamThread with thread id {}; new cache size per thread is {}", threadIdx, cacheSizePerThread);

Review comment:
       Maybe log number of threads here too?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
         return Optional.empty();
     }
 
-    // Returns the number of threads that are not in the DEAD state -- use this over threads.size()
+    // Returns the number of threads that are not in the DEAD or PENDING_SHUTDOWN state -- use this over threads.size()
     private int getNumLiveStreamThreads() {
         final AtomicInteger numLiveThreads = new AtomicInteger(0);
         synchronized (threads) {
             processStreamThread(thread -> {
                 if (thread.state() == StreamThread.State.DEAD) {
+                    log.debug("Trimming thread {} from the threads list since it's state is {}", thread.getName(), StreamThread.State.DEAD);
                     threads.remove(thread);
+                } else if (thread.state() == StreamThread.State.PENDING_SHUTDOWN) {

Review comment:
       are we risking a memory overflow with this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org