You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 17:10:03 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

wcarlson5 commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r583787814



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
                                 log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
                                 timeout = true;
+                                // Don't remove from threads until shutdown is complete. We will trim it from the
+                                // list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
+                                // shutdown then we should just consider this thread.id to be burned
+                            } else {
+                                threads.remove(streamThread);

Review comment:
       If we purge the dead threads before we add new ones and if we remove the assumption that there are no dead threads in the thread list we can just not remove the threads in remove thread. This will make it there should be no concern about the cache size changing when a thread is removing itself. And make the risk we took about memory overflows unnecessary.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
                                 log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
                                 timeout = true;
+                                // Don't remove from threads until shutdown is complete. We will trim it from the
+                                // list once it reaches DEAD, and if for some reason it's hanging indefinitely in the

Review comment:
       Where do we trim this list? I don't thing we do. In the begging of `addStreamThread()` can we purge the dead threads? That is the only place it should matter

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) {
             closeToError();
         }
         final StreamThread deadThread = (StreamThread) Thread.currentThread();
-        threads.remove(deadThread);

Review comment:
       I remember that we had the replace use the same ID for a reason. (maybe it had to do with rebalancing?). I don't think there should be a problem to try to get the same ID by waiting a bit in the replace thread

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
                                 log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
                                 timeout = true;
+                                // Don't remove from threads until shutdown is complete. We will trim it from the
+                                // list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
+                                // shutdown then we should just consider this thread.id to be burned
+                            } else {
+                                threads.remove(streamThread);
                             }
                         }
-                        threads.remove(streamThread);
+                        // Don't remove from threads until shutdown is complete since this will let another thread
+                        // reuse its thread.id. We will trim any DEAD threads from the list later
                         final long cacheSizePerThread = getCacheSizePerThread(threads.size());

Review comment:
       Perviously we had relied on the fact there were no dead threads in the list




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org