You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 08:39:44 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

cadonna commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r583459127



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1094,16 +1100,32 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
     }
 
     private int getNextThreadIndex() {
-        final HashSet<String> names = new HashSet<>();
-        processStreamThread(thread -> names.add(thread.getName()));
-        final String baseName = clientId + "-StreamThread-";
-        for (int i = 1; i <= threads.size(); i++) {
-            final String name = baseName + i;
-            if (!names.contains(name)) {
-                return i;
+        final HashSet<String> allLiveThreadNames = new HashSet<>();
+        final AtomicInteger maxThreadId = new AtomicInteger(1);
+        synchronized (threads) {
+            processStreamThread(thread -> {
+                // trim any DEAD threads from the list so we can reuse the thread.id
+                // this is only safe to do once the thread has fully completed shutdown
+                if (thread.state() == StreamThread.State.DEAD) {
+                    threads.remove(thread);
+                } else {
+                    allLiveThreadNames.add(thread.getName());
+                    final int threadId = thread.getName().charAt(thread.getName().length() - 1);

Review comment:
       If a Streams client has a number of stream threads that is not single digit (i.e. 10+), this does not work anymore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1094,16 +1100,32 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
     }
 
     private int getNextThreadIndex() {
-        final HashSet<String> names = new HashSet<>();
-        processStreamThread(thread -> names.add(thread.getName()));
-        final String baseName = clientId + "-StreamThread-";
-        for (int i = 1; i <= threads.size(); i++) {
-            final String name = baseName + i;
-            if (!names.contains(name)) {
-                return i;
+        final HashSet<String> allLiveThreadNames = new HashSet<>();
+        final AtomicInteger maxThreadId = new AtomicInteger(1);

Review comment:
       Do we really need an atomic integer here? `maxThreadId` is only used in the synchronized block.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
                                 log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
                                 timeout = true;
+                                // Don't remove from threads until shutdown is complete. We will trim it from the
+                                // list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
+                                // shutdown then we should just consider this thread.id to be burned
+                            } else {
+                                threads.remove(streamThread);
                             }
                         }
-                        threads.remove(streamThread);
+                        // Don't remove from threads until shutdown is complete since this will let another thread
+                        // reuse its thread.id. We will trim any DEAD threads from the list later
                         final long cacheSizePerThread = getCacheSizePerThread(threads.size());

Review comment:
       We need to adapt the resizing of the cache per threaad to use only the number of non-DEAD stream threads instead of all stream threads in the list. There are other two locations where we use the size of the thread list to resize the cache per thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org